Оптимизация окон Flink с небольшим размером слайда и одинаковым содержимым?

#redis #apache-flink

#redis #apache-flink

Вопрос:

Мы разработали приложение Flink, которое получает сообщения с полями, как id, value1, value2, ... в приложении Flink, они вводятся id и назначаются скользящим окнам разного размера и slide продолжительностью 10 секунд. Мы используем AggregateFunction для вычисления некоторой статистики для каждого идентификатора, и результирующий поток данных передается в Redis. Код выглядит так:

 DataStream<Tuple2<String, String>> statistics = messages.keyBy(0)
    .timeWindow(Time.seconds(300), Time.seconds(10))
    .aggregate(new Min5Aggregate())
    .setParallelism(20);

statistics.addSink(new CustomRedisSink()).setParallelism(20);
 

Благодаря этому другие системы могут использовать и отображать статистику, считывая Redis, и результат обновляется каждые 10 секунд. Но теперь у нас возникают проблемы с производительностью при этой реализации. Я считаю, что одна из причин заключается в том, что для некоторых идентификаторов созданы избыточные окна, которые не имеют очень активных обновлений значений.

Допустим, размер окна составляет 300 секунд, и есть один идентификатор, который имеет новые значения только каждые несколько часов. Но каждый раз, когда поступают новые значения этого идентификатора, будет создано 30 (300s / 10s) окон, и окна будут иметь одинаковые результаты агрегации, поскольку новых значений больше нет до истечения срока их действия. Что мы делаем сейчас, так это сравниваем выходные данные со значениями в Redis при просмотре потока результатов и пропускаем обновление, если они совпадают.

Чтобы оптимизировать производительность, мне интересно, есть ли в Flink какой-либо способ остановить запуск окна, когда оно имеет идентичное содержимое с предыдущим окном. Так что сравнения с Redis не понадобятся. Или, было бы также очень полезно, если у вас есть какие-либо другие предложения по оптимизации для этой системы.

* Поскольку существуют другие идентификаторы, которые часто обновляют значения, и нам нужна достаточно актуальная статистика, увеличение размера слайда не было бы выбором.

Ответ №1:

Я не верю, что триггер может сохранять состояние из предыдущего окна. A ProcessWindowFunction может сохранять состояние из более ранних окон, так что это вариант.

Одним из довольно простых решений было бы вставить a RichFlatMapFunction между окном и приемником, который запоминает предыдущий результат и выдает вывод только в том случае, если новый результат отличается.

Для более сложной оптимизации раздвижных окон вы можете реализовать окно как KeyedProcessFunction . Таким образом, вы можете сохранять около тридцати 10-секундных фрагментов, а также полностью агрегированный результат в течение 300 секунд, а затем каждые 10 секунд все, что вам нужно делать, это вычитать самые старые 10 секунд и добавлять самые новые 10 секунд. Для занятых клавиш это должно быть более эффективно, чем добавление каждого события в 30 окон, но самостоятельное ведение всей бухгалтерии определенно требует больше работы. В документах Flink приведен пример того, как это делается для окон времени события tumbling; расширение для скользящих окон оставлено для читателя.