Транзакция Kafka в случае многопоточности

#apache-kafka #kafka-producer-api

#apache-kafka #kafka-producer-api

Вопрос:

Я пытаюсь создать kafka producer в trasnsaction, т.Е. Я хочу написать группу сообщений, если кто-то потерпит неудачу, я хочу откатить все сообщения.

 kafkaProducer.beginTransaction();
try
{
    // code to produce to kafka topic
}
catch(Exception e)
{
    kafkaProducer.abortTransaction();
}
kafkaProducer.commitTransaction();
  

Проблема в том, что для одного потока выше работает просто отлично, но когда несколько потоков записывают, это вызывает исключение

Попытка недопустимой транзакции из состояния IN_TRANSITION в IN_TRANSITION

во время отладки я обнаружил, что если выполняется транзакция thread1, а thread2 также сообщает, что beingTransaction выдает это исключение. Чего я не нахожу, если как решить эту проблему. Одна из возможных вещей, которые я мог бы найти, — это создание пула продуктов.

Существует ли какой-либо уже доступный API для пула производителей kafka, или мне придется создать свой собственный.

Ниже приведено улучшение, о котором jira уже сообщала для этого. https://issues.apache.org/jira/browse/KAFKA-6278

Любое другое предложение будет действительно полезным

Ответ №1:

Одновременно с экземпляром производителя может выполняться только одна транзакция.

Если у вас есть несколько потоков, выполняющих отдельную обработку, и все они нуждаются в семантике ровно один раз, у вас должен быть экземпляр производителя для каждого потока.

Комментарии:

1. Есть ли какой-либо API, доступный в kafka для ProducerPool, чтобы мы не создавали столько производителей, а затем закрывали их.

2. Это не является частью API Apache Kafka. Некоторые фреймворки, такие как spring-kafka, предоставляют эту функциональность

Ответ №2:

Не уверен, что это было решено. вы можете использовать apache common pool2 для создания пула экземпляров производителя. В методе create() фабричной реализации вы можете сгенерировать и назначить уникальный идентификатор транзакции, чтобы избежать конфликта (исключение ProducerFencedException)