Каков наилучший способ сохранить хэш-карту в состоянии в Flink

#java #apache-flink #flink-streaming #flink-cep

#java #apache-flink #flink-streaming #flink-cep

Вопрос:

У меня есть задание Flink, и одному из операторов с полным состоянием необходимо сохранить в состоянии класс, содержащий HashMap в качестве атрибута, потому что этот hasMap сохраняет разные привязки к пользователю, например:

 public class Affinity {
public String id;
public String colorTriggered;
public Map<String,Integer> affinities;
/*this object keeps the affinity for a user to a different colors for example: 
affinities.put(green, 5);
affinities.put(blue, 9);
affinities.put(white, 2);

to calculate then what is the color's affinity of this user, in this case the answer will be blue
*/
}
  

Эта хэш-карта используется для отслеживания этих сродств и в определенный момент запрашивает сродство цвета пользователя и получает ключ с наивысшим значением сродства, которое будет синим, что значение 9.

Поскольку хэш-карты не являются частью сериализации Flink, мне нужно будет включить implement Serializable в свой класс.

Это плохая идея или есть лучший способ сделать это и сохранить объект в состояниях?

В полном примере более или менее то, что мне нужно сделать, но не уверен, что использование HashMap в операторе Flink и в состояниях является хорошей идеей:

 public class AffinityFlatMapFunction extends RichFlatMapFunction<Event, Affinity> implements MapOperations {

  @Override
  public void flatMap(Event event, Collector<Affinity> collector) throws Exception {
   Affinity previous = state.value();
    if(previous.hashMap.contains(event.color)){
        previous.hashMap.replace(event.color, value   1);
    }else previous.hashMap.put(event.color, 1);
   /*something like this*/
  String match = previous.hashMap.stream.filter(x -> 
              x.getKey().contains(event.color)).max(Map.Entry.comparingByValue())
                .map(Map.Entry::getKey).orElse("empty");
   if(!match.equals(previous.colorTriggered){
       previous.colorTriggered = match;
       state.update(previous);
       collector.collect(previous);
   }
 }
}
  

С уважением!

Комментарии:

1. Я считаю, что хранилище RocksDB предпочтительнее хэш-карты

2. В противном случае, если вы отправляете кортежи (name, 1) в поток и подсчитываете его в word, вы создаете такое же отображение

3. Привет @Onecricket, спасибо за ответ, я уже отредактировал вопрос, посмотрите сейчас, пожалуйста. Спасибо

4. Я не могу использовать RocksDB из-за того, что в этом приложении не разрешено увеличение задержки.

5. В решении этой ситуации я бы создал MapState<String, Integer> state , на котором основан каждый ключ String key = event.id event.color , а затем запросил state.contains(key) и выполнил операции, показанные выше, с этими базами, но я не уверен, что это хорошая идея для контрольных точек и использования процессора. Спасибо.

Ответ №1:

Согласно документации, существует конструкция состояния с именем MapState<UK, UV> , которая выполняет следующее:

mapState<UK, UV>: здесь хранится список сопоставлений. Вы можете поместить пары ключ-значение в состояние и получить итерацию по всем сохраненным в данный момент сопоставлениям. Сопоставления добавляются с помощью put(UK, UV) или putAll(Map<UK, UV>). Значение, связанное с ключом пользователя, может быть получено с помощью get(ВЕЛИКОБРИТАНИЯ). Повторяющиеся представления для сопоставлений, ключей и значений могут быть получены с помощью entries(), keys() и values() соответственно. Вы также можете использовать isEmpty(), чтобы проверить, содержит ли эта карта какие-либо сопоставления ключ-значение.

Несколько дней назад я однажды прочитал в потоке Flink, что предоставленные StateDescriptors оптимизированы и почти всегда являются предпочтительным выбором вместо реализации собственного механизма.

Если вы не разделяете свой поток нежелательным образом (использование keyBy(color) должно быть в порядке), у вас всегда должно быть самое текущее состояние вашей карты. Я не знаю, обоснована ли ваша озабоченность по поводу задержки RocksDB, поскольку состояние Flink хранится в куче и привязано только к RocksDB, поэтому все текущие значения доступны на лету; но, возможно, я это неправильно понял. Оглядываясь назад, я даже сомневаюсь, что вам нужна карта, но просто ValueState хранить ваше целое число, так как в этом случае keyBy() позаботится о «ключевой» части карты.

Комментарии:

1. Привет @kopaka, большое спасибо за ваш ответ, но я боюсь, что это не подходит для меня, потому что у меня уже есть KeyedStream критерии идентификатора пользователя, и мне нужно отслеживать сходство с этим же пользователем примерно по 5 различным атрибутам, а не только по цветам, что было и, например, говоря это, я не могу создать новый KeyedStream для каждого из этих атрибутов, что является причиной hasMap или MapState , но не нового keyby . В настоящее время я использую `mapState в решении, но пытаюсь найти лучшее решение для этого. С уважением.

2. Вопрос по этому поводу MapStates : если у меня есть MapState <String, Integer> state и HashMap<String, Integer> affinity и сделать что-то подобное, affinity.put(red, 5); affinity.put(blue, 10); affinity.put(black, 25); тогда я делаю state.putAll(affinty); . Когда я это сделаю, state.get(key); получу ли я только Integer значение или полное HashMap ? насколько я понимаю, это будет то же самое, что делать, state.put(black,25); ... а затем state.get(black) вернет 25. Спасибо