Flink — как агрегировать в состоянии

#apache-flink #flink-streaming

#apache-flink #flink-потоковая передача

Вопрос:

У меня есть ключевой поток данных, который выглядит как:

     {
        summary:Integer
        uid:String
        key:String
        .....
    }
  

Мне нужно агрегировать итоговые значения за некоторый временной диапазон, и как только я получу определенное число, сбросить сводку и все идентификаторы UID, которые повлияли на сводку, в файл базы данных / журнала.

после первой очистки я хочу удалить все uid из памяти и просто немедленно удалить каждый новый элемент.

Итак, я попробовал эту агрегатную функцию.

 public class AggFunc implements AggregateFunction<Item, Acc, Tuple2<Integer,List<String>>>{

    private static final long serialVersionUID = 1L;

    @Override
    public Acc createAccumulator() {
        return new Acc());
    }

    @Override
    public Acc add(Item value, Acc accumulator) {
        accumulator.inc(value.getSummary());
        accumulator.addUid(value.getUid);
        return accumulator;
    }

    @Override
    public Tuple2<Integer,List<String>> getResult(Acc accumulator) {
        List<String> newL = Lists.newArrayList(accumulator.getUids());
        accumulator.setUids(Lists.newArrayList());
        return Tuple2.of(accumulator.getSum(), newL);
    }

    @Override
    public Acc merge(Acc a, Acc b) {
        .....
    }

}
  

и в функции агрегатного процесса я сбрасываю список в состояние, и если мне нужно сохранить в базе данных, я очищаю состояние и сохраняю флаг в состоянии, чтобы указать это.

Но мне это кажется кривым. И я не уверен, что это сработает хорошо для меня.

Есть ли лучшее решение для этой ситуации?

Комментарии:

1. Это сбивает с толку ваш вопрос. Мне кажется, что вам нужно 2 запроса. Один для суммирования за оконное время, а другой — без окон.

2. нет. им обоим нужно одно и то же окно. список «uids» дает мне необходимые указания по отладке в сводке

Ответ №1:

Работайте с состоянием внутри расширенной функции. Продолжайте добавлять uid в вашем состоянии и при срабатывании окна для удаления значений. На этой странице из официальной документации приведен пример.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state

Для вашего случая ListState будет работать хорошо.

Редактировать:

Приведенное выше решение предназначено для неоконного случая. для случая окна просто используйте объединение с функцией apply, которая может иметь расширенную оконную функцию

Комментарии:

1. Спасибо, на самом деле мне это нужно в window, поэтому приведенный выше пример не является решением в моем случае, но вы указали мне направление и просто использовали низкоуровневую агрегатную функцию — apply

2. Я рад, что это могло бы вам помочь!

3. Ну, я вижу, что это не очень хорошо для меня, поскольку метод apply в window работает как процесс, который сохраняет все элементы в window, а не отбрасывает их, как агрегаторы, так что это съедает мою память

4. Вы также можете отбросить элементы вашего состояния при запуске окна.