Мигание и неключевое состояние окна

#apache-kafka #stream #apache-flink #apache-kafka-streams #flink-streaming

#apache-kafka #поток #apache-щелчок #apache-kafka-потоки #flink-потоковая передача

Вопрос:

Я создаю приложение Flink, которое просто перенаправляет оконные входящие события Kafka в другую тему Kafka с добавлением маркеров начала и конца для каждого окна — так, например, для окна, содержащего 1 час 1, 2, 3, 4, 5 , я перейду start_timestamp, 1, 2, 3, 4, 5, end_timestamp к другой теме Kafka. Возможно, позже будут некоторые другие преобразования, но в целом, для N поступающих событий я всегда буду выдавать не менее N 2 событий.

Как я понимаю, использование windowAll() with a ProcessAllWindowFunction , которое вводит начальные и конечные маркеры, должно сделать это. Мой вопрос касается управления состоянием. Я буду использовать серверную часть состояния RocksDB — будет ли она также сохранять внутреннее состояние окна даже для этого неключевого потока? Моя главная задача — сохранить состояние в окне, чтобы я не перерабатывал его снова, особенно для больших окон.

Комментарии:

1. Не могли бы вы уточнить, я не совсем уверен, что вы подразумеваете под сохранением состояния в окне.

2. Итак, допустим, мой конвейер просто stream.map(object -> object.incrField1()).windowAll(1.hour).process(MyProcessor()) . Тогда, как я понимаю, состояние, которое MyProcessor продолжает накапливать как итерируемый <MyObject>, будет просто исходным списком объектов, которые вошли с увеличением field1 .

Ответ №1:

Для чего-то такого простого я бы использовал a FlatMap (с параллелизмом, равным 1), который сохраняет в состоянии время текущего окна и время последнего события. Всякий раз, когда поступает запись, если она находится в новом часовом окне, я бы отправил end_timestamp (время последнего события), start_timestamp (из новой записи) и обновил текущий час сохраненного состояния. Во всех случаях время последнего события в состоянии также обновляется. Это предполагает, что ваши входящие события строго упорядочены, поэтому вам не нужно беспокоиться о поздних данных.

Ответ №2:

Я предпочитаю подход @kkrugler, поскольку он позволит избежать затрат на поддержание всего этого состояния. Но, чтобы ответить на ваш вопрос, да, a windowAll может использовать серверную часть состояния RocksDB для сохранения своего содержимого. Под капотом a windowAll на самом деле находится окно с ключом со специальным постоянным ключом. Таким образом, хотя RocksDB можно использовать только для управления ключевым состоянием, оно работает.