Потоки Kafka: как получить ограничения смещения до подачи заявки на повторную обработку и как это остановить

#apache-kafka #apache-kafka-streams

#apache-kafka #apache-kafka-streams

Вопрос:

Я ищу способ создать инструмент повторной обработки в KafkaStreams, который позволит повторно обрабатывать данные с самого начала в теме (применяя некоторые фильтры и записывая обновленные версии этих событий в ту же тему). В то же время существует длительное приложение, обрабатывающее данные из этой темы.

Для повторной обработки только до того момента времени, в который приложение запускается и останавливается после него, необходимо знать, когда следует остановиться, что является последним произведенным смещением на данный момент. Например. карта может быть построена до запуска топологии, которая будет иметь (раздел -> смещение), чтобы знать эти ограничения, поэтому приложение сможет остановиться при достижении этого смещения, сравнивая текущий раздел и смещение (через Processor API) с пределом смещения для этого исходная карта.

Возможно ли / имеет ли смысл получать доступ к последней информации о смещениях из потоков Kafka? Есть ли другой способ обойти это? (Я думаю, вы можете создать это с помощью обычных потребителей Kafka, ища end и получая position, но я спрашиваю, есть ли интегрированное решение в KafkaStreams).

Кроме того, как аккуратно остановить приложение, только когда все разделы достигли своего смещения, зная, что эта информация распределена, поэтому вам нужно знать состояние из всех экземпляров?

Kafka / KafkaStreams 2.1, Scala 2.12

Ответ №1:

Использование потребителя для получения конечного смещения кажется разумным. Для остановки приложения вам потребуется создать ручное решение, которое отслеживает прогресс. Например, с помощью transformValues() вы могли бы проверить название темы, раздела и смещение входной записи (используя context объект, предоставляемый через init() метод). Это должно позволить вам вызывать KafkaStreams#close() , когда все данные обработаны.

Возможно, вас заинтересует этот KIP (в активном atm), в котором обсуждались похожие идеи: https://cwiki.apache.org/confluence/display/KAFKA/KIP-95: Incremental Batch Processing for Kafka Streams

Комментарии:

1. Я думаю, мне придется реализовать что-то вроде того, что предлагает KIP. Некоторые сомнения: — stop processing this partition (i.e., pause the partition) : как это сделать в KS? Я знаю, что могу игнорировать новые данные, но как «приостановить»? — last running member of the group, i.e., the application will also be the group leader and will know if it is the only running instance) , это то место, где KafkaStreams.close() следует вызывать, верно? Однако в контейнерном сценарии, когда задачи будут перезапускаться при сбое, они должны прочитать маркер, чтобы увидеть, должны ли они просто умереть при запуске, потому что он завершен.

2. В потоках Kafka вы не можете pause() создать раздел (KIP изменил бы потоки Kafka, чтобы потоки Kafka могли делать это внутренне, но это не отображается). Определить, когда нужно close() , сложно, особенно если вы реализуете это вручную. Я упомянул KIP, чтобы указать на варианты дизайна — я сомневаюсь, что вы сможете реализовать его 1: 1 без изменения самих потоков Kafka, но вам нужно будет пойти на некоторые другие компромиссы.

3. Возможно ли вместо этого узнать, находитесь ли вы в конце смещения в KS? Т.е. нулевая задержка. Это помогло бы. поскольку я был бы не против остановиться, когда мы узнаем, что мы использовали весь раздел, чтобы убедиться, что все содержимое до сейчас было переработано.

4. docs.confluent.io/current/streams/…

5. Потоки Kafka не могут вам сказать — вам нужно будет заранее получить окончание вручную.