Мы используем Spring Cloud Stream для Kafka и ищем семантику ровно один раз с помощью пользовательского API

#kafka-consumer-api #kafka-producer-api #spring-cloud-stream

#kafka-consumer-api #kafka-producer-api #spring-cloud-stream

Вопрос:

Мы используем Spring Cloud Stream для Kafka и ищем семантику ровно один раз. У нас есть одно решение, которое работает нормально, как и ожидалось 1) Включение идемпотента и транзакции от производителя 2) Использование MetadataStore для проверки дубликата сообщения со стороны потребителя с помощью ключа (offsetId partitionId topicName) С помощью вышеупомянутого решения у нас нет потери сообщений и нет обработки дубликатов

Но теперь мы обнаружили, что есть одно свойство ( producer.sendOffsetsToTransaction ) Kafka API, которое помогает нам исправить дублирующуюся обработку со стороны потребителя без какой-либо логики хранилища метаданных. Теперь я не уверен, как мы можем сделать это с spring cloud stream с этим свойством .sendOffsetsToTransaction

Ответ №1:

Фреймворк обрабатывает ее автоматически, если вы добавляете KafkaTransactionManager в контекст приложения.

Вы должны добавить префикс идентификатора транзакции в конфигурацию.

spring.kafka.producer.transaction-id-prefix

и Boot автоматически добавит диспетчер транзакций.

Смотрите свойства производителя.

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

Разрешает транзакции в binder. Смотрите transaction.id в документации Kafka и транзакциях в документации spring-kafka. Когда транзакции включены, отдельные свойства производителя игнорируются, и все производители используют свойства spring.cloud.stream.kafka.binder.transaction.producer.*.

Контейнер прослушивателя отправляет смещение транзакции перед фиксацией транзакции, когда прослушиватель завершает работу в обычном режиме.

Комментарии:

1. Спасибо за ответ, Гэри. Итак, если у меня включено это свойство (kafka: binder: transaction: transaction-id-prefix:) в приложении. yml Создаст transactionmanager, стажер которого позаботится о вызове этого метода и передаче значения (смещения и groupId) в ‘sendOffsetsToTransaction’, верно?.

2. Нет; вам либо нужно вручную настроить диспетчер транзакций, либо вам нужно также установить свойство Boot spring.kafka.producer.transaction-id-prefix , чтобы он объявлял диспетчер транзакций для вас. Затем контейнер автоматически отправит вам смещения.

3. Гэри, спасибо за твой реплей, я могу внести это изменение, и оно работает нормально. Но у меня другой сценарий, и я вижу другое поведение. Всего 3 разных сервиса 1) Первый сервис прослушает из очереди Solace и передаст ее в kafka topic-1 (где транзакции включены) 2) Второй сервис прослушает сверху kafka topic-1 и запишет ее в другой kafka topic-2 (где у нас нет ручных коммитов, транзакции разрешены для передачи в другую тему, автоматическое смещение фиксации как false и изоляция. уровень установлен на read_commited)

4. 3) Третья служба прослушает kafka topic-2 и запишет ее обратно в Solace queue (где у нас нет ручных коммитов, автоматическое смещение фиксации как false и изоляция. уровень установлен в read_commited). Теперь проблема после того, как я включил транзакцию и уровень изоляции во втором сервисе, я не могу читать какие-либо сообщения, если я отключил транзакцию во втором сервисе, я могу читать все сообщения. 1) Можем ли мы включить транзакции и уровень изоляции в одном отдельном сервисе 2) Как это работает, если мой сервис является просто производителем или потребителем (как EoS гарантирует для этих сервисов)

5. >and it's working fine... . Комментарии не предназначены для того, чтобы задавать дополнительные вопросы; вы должны закрыть этот и начать новый.