#redis #apache-flink
#redis #apache-flink
Вопрос:
Мы разработали приложение Flink, которое получает сообщения с полями, как id, value1, value2, ...
в приложении Flink, они вводятся id
и назначаются скользящим окнам разного размера и slide
продолжительностью 10 секунд. Мы используем AggregateFunction для вычисления некоторой статистики для каждого идентификатора, и результирующий поток данных передается в Redis. Код выглядит так:
DataStream<Tuple2<String, String>> statistics = messages.keyBy(0)
.timeWindow(Time.seconds(300), Time.seconds(10))
.aggregate(new Min5Aggregate())
.setParallelism(20);
statistics.addSink(new CustomRedisSink()).setParallelism(20);
Благодаря этому другие системы могут использовать и отображать статистику, считывая Redis, и результат обновляется каждые 10 секунд. Но теперь у нас возникают проблемы с производительностью при этой реализации. Я считаю, что одна из причин заключается в том, что для некоторых идентификаторов созданы избыточные окна, которые не имеют очень активных обновлений значений.
Допустим, размер окна составляет 300 секунд, и есть один идентификатор, который имеет новые значения только каждые несколько часов. Но каждый раз, когда поступают новые значения этого идентификатора, будет создано 30 (300s / 10s) окон, и окна будут иметь одинаковые результаты агрегации, поскольку новых значений больше нет до истечения срока их действия. Что мы делаем сейчас, так это сравниваем выходные данные со значениями в Redis при просмотре потока результатов и пропускаем обновление, если они совпадают.
Чтобы оптимизировать производительность, мне интересно, есть ли в Flink какой-либо способ остановить запуск окна, когда оно имеет идентичное содержимое с предыдущим окном. Так что сравнения с Redis не понадобятся. Или, было бы также очень полезно, если у вас есть какие-либо другие предложения по оптимизации для этой системы.
* Поскольку существуют другие идентификаторы, которые часто обновляют значения, и нам нужна достаточно актуальная статистика, увеличение размера слайда не было бы выбором.
Ответ №1:
Я не верю, что триггер может сохранять состояние из предыдущего окна. A ProcessWindowFunction
может сохранять состояние из более ранних окон, так что это вариант.
Одним из довольно простых решений было бы вставить a RichFlatMapFunction
между окном и приемником, который запоминает предыдущий результат и выдает вывод только в том случае, если новый результат отличается.
Для более сложной оптимизации раздвижных окон вы можете реализовать окно как KeyedProcessFunction
. Таким образом, вы можете сохранять около тридцати 10-секундных фрагментов, а также полностью агрегированный результат в течение 300 секунд, а затем каждые 10 секунд все, что вам нужно делать, это вычитать самые старые 10 секунд и добавлять самые новые 10 секунд. Для занятых клавиш это должно быть более эффективно, чем добавление каждого события в 30 окон, но самостоятельное ведение всей бухгалтерии определенно требует больше работы. В документах Flink приведен пример того, как это делается для окон времени события tumbling; расширение для скользящих окон оставлено для читателя.