Apache flink получает доступ к состоянию с ключом из позднего окна

#apache-flink #flink-streaming

Вопрос:

Я пишу приложение Flink, которое использует данные временных рядов из темы Кафки. Данные временных рядов содержат такие компоненты, как имя показателя, пара значений ключа тега, метка времени и значение. Я создал окно обработки для агрегирования данных на основе ключа метрики (который представляет собой комбинацию имени метрики, пары значений ключа и метки времени). Вот основной поток выглядит так

 kafka source -> Flat Map which parses and emits Metric ->  Key by metric
key  -> Tumbling window of 60 seconds -> Aggregate the data -> write to the
sink.
 

Я также хочу проверить, есть ли какая-либо метрика, которая появилась с опозданием за пределами
вышеуказанного окна. Я хочу проверить, сколько показателей поступило с опозданием, и рассчитать
процент поздних показателей по сравнению с исходными показателями. Я подумываю
об использовании функции «Разрешенная доступность» flink, чтобы отправлять запоздавшие метрики в
другой поток. Я планирую добавить «Состояние карты» в основной
Оператор «Агрегировать данные», который будет иметь ключ в качестве ключа метрики и
значение в качестве количества показателей, поступивших в главное окно.

 kafka source -> Flat Map which parses and emits Metric -> Key by metric key
->  Tumbling window of 60 seconds -> Aggregate the data (Maintain a map
state of metric count) -> write to the sink.

                                                   

                                                    

                                                  Late data -> Key by
 metric key ->  Collect late metrics and find the percentage of late metrics
 -> Write the result in sink
 

Мой вопрос в том, может ли оператор «Собирать поздние показатели и находить процент поздних
показателей» получить доступ к «mapState», который был обновлен
мейнстримом. Несмотря на то, что они управляются одним и тем же метрическим ключом, я думаю, что
это две разные задачи. Я хочу рассчитать (количество поздних показателей /
(количество опоздавших показателей количество показателей, полученных вовремя)).

Ответ №1:

Есть несколько различных способов, которыми вы могли бы подойти к этому.

Вы можете сохранить состояние каждого окна в KeyedStateStore windowState() предоставленном контексте, переданном вам WindowProcessFunction . Используется в сочетании с allowedLateness тем , что вы можете вычислять статистику поздних событий по мере возникновения поздних срабатываний. (При таком подходе нет необходимости в mapState, так windowState как область действия уже ограничена определенным окном и определенным ключом. Значения будет достаточно.)

Другая идея состояла бы в том, чтобы захватить побочный выходной поток поздних событий из основного окна и отправить эти поздние события через другое окно, которое подсчитывает их в течение некоторого периода времени. Затем отправьте как этот поток аналитики поздних событий, так и выходные данные первого (главного) окна в функцию Keyedcoprocess (или RichCoFlatMap), которая может вычислять статистику поздних событий по времени. (Здесь вам понадобится mapState, так как вам может потребоваться открыть несколько окон одновременно для каждого ключа потока с ключом.)

Или вы можете использовать простую функцию процесса, чтобы разделить начальный поток на два (путем сравнения временных меток с текущим водяным знаком)-один для поздних, а другой для не поздних событий-а затем использовать Flink SQL для вычисления всей статистики.

Или просто реализовать все это в одном KeyedProcessFunction . Видишь https://ci.apache.org/projects/flink/flink-docs-stable/docs/learn-flink/event_driven/ для примера.