#apache-kafka #apache-kafka-streams
#apache-kafka #apache-kafka-streams
Вопрос:
Я ищу способ создать инструмент повторной обработки в KafkaStreams, который позволит повторно обрабатывать данные с самого начала в теме (применяя некоторые фильтры и записывая обновленные версии этих событий в ту же тему). В то же время существует длительное приложение, обрабатывающее данные из этой темы.
Для повторной обработки только до того момента времени, в который приложение запускается и останавливается после него, необходимо знать, когда следует остановиться, что является последним произведенным смещением на данный момент. Например. карта может быть построена до запуска топологии, которая будет иметь (раздел -> смещение), чтобы знать эти ограничения, поэтому приложение сможет остановиться при достижении этого смещения, сравнивая текущий раздел и смещение (через Processor API) с пределом смещения для этого исходная карта.
Возможно ли / имеет ли смысл получать доступ к последней информации о смещениях из потоков Kafka? Есть ли другой способ обойти это? (Я думаю, вы можете создать это с помощью обычных потребителей Kafka, ища end и получая position, но я спрашиваю, есть ли интегрированное решение в KafkaStreams).
Кроме того, как аккуратно остановить приложение, только когда все разделы достигли своего смещения, зная, что эта информация распределена, поэтому вам нужно знать состояние из всех экземпляров?
Kafka / KafkaStreams 2.1, Scala 2.12
Ответ №1:
Использование потребителя для получения конечного смещения кажется разумным. Для остановки приложения вам потребуется создать ручное решение, которое отслеживает прогресс. Например, с помощью transformValues()
вы могли бы проверить название темы, раздела и смещение входной записи (используя context
объект, предоставляемый через init()
метод). Это должно позволить вам вызывать KafkaStreams#close()
, когда все данные обработаны.
Возможно, вас заинтересует этот KIP (в активном atm), в котором обсуждались похожие идеи: https://cwiki.apache.org/confluence/display/KAFKA/KIP-95: Incremental Batch Processing for Kafka Streams
Комментарии:
1. Я думаю, мне придется реализовать что-то вроде того, что предлагает KIP. Некоторые сомнения: —
stop processing this partition (i.e., pause the partition)
: как это сделать в KS? Я знаю, что могу игнорировать новые данные, но как «приостановить»? —last running member of the group, i.e., the application will also be the group leader and will know if it is the only running instance)
, это то место, гдеKafkaStreams.close()
следует вызывать, верно? Однако в контейнерном сценарии, когда задачи будут перезапускаться при сбое, они должны прочитать маркер, чтобы увидеть, должны ли они просто умереть при запуске, потому что он завершен.2. В потоках Kafka вы не можете
pause()
создать раздел (KIP изменил бы потоки Kafka, чтобы потоки Kafka могли делать это внутренне, но это не отображается). Определить, когда нужноclose()
, сложно, особенно если вы реализуете это вручную. Я упомянул KIP, чтобы указать на варианты дизайна — я сомневаюсь, что вы сможете реализовать его 1: 1 без изменения самих потоков Kafka, но вам нужно будет пойти на некоторые другие компромиссы.3. Возможно ли вместо этого узнать, находитесь ли вы в конце смещения в KS? Т.е. нулевая задержка. Это помогло бы. поскольку я был бы не против остановиться, когда мы узнаем, что мы использовали весь раздел, чтобы убедиться, что все содержимое до сейчас было переработано.
4. docs.confluent.io/current/streams/…
5. Потоки Kafka не могут вам сказать — вам нужно будет заранее получить окончание вручную.