кафка использует управление на основе идентификатора смещения в разных кластерах

#apache-kafka #kafka-consumer-api

#apache-kafka #кафка-потребитель-api

Вопрос:

У меня есть некоторый код потребителя Apache Kafka, который запущен и получает сведения о пользователе, обрабатывает и соответствующим образом обновляет.

В настоящее время я использую kafka offset для отслеживания того, какую запись я обрабатываю должным образом.

всякий раз, когда мои потребители перезапускаются по какой-либо причине (один узел выходит из строя, а другие узлы получают данные, или потребитель перезапускается и т.д.), Сначала он устанавливает Kafka offset, считываемый на основе обработанного смещения

 consumer.seek(//get the offset from db);
  

и начинает опрос

 consumer.poll()
  

теперь проблема связана с каким-то другим региональным тестом на сбой, то же приложение будет запущено в другом месте и начнет обрабатывать новые данные.

т. Е. база данных глобально синхронизирована, но между кластером kafka другого региона нет синхронизации, поэтому я получаю смещение, которое не упорядочено для одной и той же темы в другом регионе.

Следовательно, я в конечном итоге ищу другое смещение в другом регионе.

всякий раз, когда происходит сбой, данные в первом кластере не будут передаваться во второй кластер, это нормально в соответствии с бизнес-требованиями.

текущая проблема заключается в том, что когда новая запись поступает во второй кластер, я не должен начинать со смещения, установленного первым кластерным приложением, этим можно управлять, сохраняя смещение вместе с Kafka ClusterId (name), поэтому всякий раз, когда я ищу смещение, я могу запросить вместе с кластером, чтобы получить смещение на основе региона.

есть ли лучший способ справиться с этой ситуацией?

Комментарии:

1. «нет синхронизации между кластером kafka из разных регионов»… Не могли бы вы использовать MirrorMaker2, чтобы исправить это?

2. Будет ли mirrormake2 также гарантировать синхронизацию смещения? что я получил из некоторой документации, так это то, что MM2 может гарантировать синхронизацию записей, но не точную синхронизацию смещения

3. Это mirrormaker 1, а не 2. См. пункт 3 github.com/apache/kafka/tree/trunk/connect /…