#java #apache-flink #flink-streaming #flink-cep
#java #apache-flink #flink-streaming #flink-cep
Вопрос:
У меня есть задание Flink, и одному из операторов с полным состоянием необходимо сохранить в состоянии класс, содержащий HashMap в качестве атрибута, потому что этот hasMap сохраняет разные привязки к пользователю, например:
public class Affinity {
public String id;
public String colorTriggered;
public Map<String,Integer> affinities;
/*this object keeps the affinity for a user to a different colors for example:
affinities.put(green, 5);
affinities.put(blue, 9);
affinities.put(white, 2);
to calculate then what is the color's affinity of this user, in this case the answer will be blue
*/
}
Эта хэш-карта используется для отслеживания этих сродств и в определенный момент запрашивает сродство цвета пользователя и получает ключ с наивысшим значением сродства, которое будет синим, что значение 9.
Поскольку хэш-карты не являются частью сериализации Flink, мне нужно будет включить implement Serializable
в свой класс.
Это плохая идея или есть лучший способ сделать это и сохранить объект в состояниях?
В полном примере более или менее то, что мне нужно сделать, но не уверен, что использование HashMap в операторе Flink и в состояниях является хорошей идеей:
public class AffinityFlatMapFunction extends RichFlatMapFunction<Event, Affinity> implements MapOperations {
@Override
public void flatMap(Event event, Collector<Affinity> collector) throws Exception {
Affinity previous = state.value();
if(previous.hashMap.contains(event.color)){
previous.hashMap.replace(event.color, value 1);
}else previous.hashMap.put(event.color, 1);
/*something like this*/
String match = previous.hashMap.stream.filter(x ->
x.getKey().contains(event.color)).max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey).orElse("empty");
if(!match.equals(previous.colorTriggered){
previous.colorTriggered = match;
state.update(previous);
collector.collect(previous);
}
}
}
С уважением!
Комментарии:
1. Я считаю, что хранилище RocksDB предпочтительнее хэш-карты
2. В противном случае, если вы отправляете кортежи
(name, 1)
в поток и подсчитываете его в word, вы создаете такое же отображение3. Привет @Onecricket, спасибо за ответ, я уже отредактировал вопрос, посмотрите сейчас, пожалуйста. Спасибо
4. Я не могу использовать RocksDB из-за того, что в этом приложении не разрешено увеличение задержки.
5. В решении этой ситуации я бы создал
MapState<String, Integer> state
, на котором основан каждый ключString key = event.id event.color
, а затем запросилstate.contains(key)
и выполнил операции, показанные выше, с этими базами, но я не уверен, что это хорошая идея для контрольных точек и использования процессора. Спасибо.
Ответ №1:
Согласно документации, существует конструкция состояния с именем MapState<UK, UV>
, которая выполняет следующее:
mapState<UK, UV>: здесь хранится список сопоставлений. Вы можете поместить пары ключ-значение в состояние и получить итерацию по всем сохраненным в данный момент сопоставлениям. Сопоставления добавляются с помощью put(UK, UV) или putAll(Map<UK, UV>). Значение, связанное с ключом пользователя, может быть получено с помощью get(ВЕЛИКОБРИТАНИЯ). Повторяющиеся представления для сопоставлений, ключей и значений могут быть получены с помощью entries(), keys() и values() соответственно. Вы также можете использовать isEmpty(), чтобы проверить, содержит ли эта карта какие-либо сопоставления ключ-значение.
Несколько дней назад я однажды прочитал в потоке Flink, что предоставленные StateDescriptors
оптимизированы и почти всегда являются предпочтительным выбором вместо реализации собственного механизма.
Если вы не разделяете свой поток нежелательным образом (использование keyBy(color)
должно быть в порядке), у вас всегда должно быть самое текущее состояние вашей карты. Я не знаю, обоснована ли ваша озабоченность по поводу задержки RocksDB, поскольку состояние Flink хранится в куче и привязано только к RocksDB, поэтому все текущие значения доступны на лету; но, возможно, я это неправильно понял. Оглядываясь назад, я даже сомневаюсь, что вам нужна карта, но просто ValueState
хранить ваше целое число, так как в этом случае keyBy()
позаботится о «ключевой» части карты.
Комментарии:
1. Привет @kopaka, большое спасибо за ваш ответ, но я боюсь, что это не подходит для меня, потому что у меня уже есть
KeyedStream
критерии идентификатора пользователя, и мне нужно отслеживать сходство с этим же пользователем примерно по 5 различным атрибутам, а не только по цветам, что было и, например, говоря это, я не могу создать новыйKeyedStream
для каждого из этих атрибутов, что является причинойhasMap
илиMapState
, но не новогоkeyby
. В настоящее время я использую `mapState в решении, но пытаюсь найти лучшее решение для этого. С уважением.2. Вопрос по этому поводу
MapStates
: если у меня естьMapState <String, Integer> state
иHashMap<String, Integer> affinity
и сделать что-то подобное,affinity.put(red, 5); affinity.put(blue, 10); affinity.put(black, 25);
тогда я делаюstate.putAll(affinty);
. Когда я это сделаю,state.get(key);
получу ли я толькоInteger
значение или полноеHashMap
? насколько я понимаю, это будет то же самое, что делать,state.put(black,25); ...
а затемstate.get(black)
вернет 25. Спасибо