#apache-flink #flink-streaming
#apache-flink #flink-потоковая передача
Вопрос:
У меня есть ключевой поток данных, который выглядит как:
{
summary:Integer
uid:String
key:String
.....
}
Мне нужно агрегировать итоговые значения за некоторый временной диапазон, и как только я получу определенное число, сбросить сводку и все идентификаторы UID, которые повлияли на сводку, в файл базы данных / журнала.
после первой очистки я хочу удалить все uid из памяти и просто немедленно удалить каждый новый элемент.
Итак, я попробовал эту агрегатную функцию.
public class AggFunc implements AggregateFunction<Item, Acc, Tuple2<Integer,List<String>>>{
private static final long serialVersionUID = 1L;
@Override
public Acc createAccumulator() {
return new Acc());
}
@Override
public Acc add(Item value, Acc accumulator) {
accumulator.inc(value.getSummary());
accumulator.addUid(value.getUid);
return accumulator;
}
@Override
public Tuple2<Integer,List<String>> getResult(Acc accumulator) {
List<String> newL = Lists.newArrayList(accumulator.getUids());
accumulator.setUids(Lists.newArrayList());
return Tuple2.of(accumulator.getSum(), newL);
}
@Override
public Acc merge(Acc a, Acc b) {
.....
}
}
и в функции агрегатного процесса я сбрасываю список в состояние, и если мне нужно сохранить в базе данных, я очищаю состояние и сохраняю флаг в состоянии, чтобы указать это.
Но мне это кажется кривым. И я не уверен, что это сработает хорошо для меня.
Есть ли лучшее решение для этой ситуации?
Комментарии:
1. Это сбивает с толку ваш вопрос. Мне кажется, что вам нужно 2 запроса. Один для суммирования за оконное время, а другой — без окон.
2. нет. им обоим нужно одно и то же окно. список «uids» дает мне необходимые указания по отладке в сводке
Ответ №1:
Работайте с состоянием внутри расширенной функции. Продолжайте добавлять uid
в вашем состоянии и при срабатывании окна для удаления значений. На этой странице из официальной документации приведен пример.
Для вашего случая ListState
будет работать хорошо.
Редактировать:
Приведенное выше решение предназначено для неоконного случая. для случая окна просто используйте объединение с функцией apply, которая может иметь расширенную оконную функцию
Комментарии:
1. Спасибо, на самом деле мне это нужно в window, поэтому приведенный выше пример не является решением в моем случае, но вы указали мне направление и просто использовали низкоуровневую агрегатную функцию — apply
2. Я рад, что это могло бы вам помочь!
3. Ну, я вижу, что это не очень хорошо для меня, поскольку метод apply в window работает как процесс, который сохраняет все элементы в window, а не отбрасывает их, как агрегаторы, так что это съедает мою память
4. Вы также можете отбросить элементы вашего состояния при запуске окна.