#apache-kafka #kafka-producer-api
#apache-kafka #kafka-producer-api
Вопрос:
Я пытаюсь создать kafka producer в trasnsaction, т.Е. Я хочу написать группу сообщений, если кто-то потерпит неудачу, я хочу откатить все сообщения.
kafkaProducer.beginTransaction();
try
{
// code to produce to kafka topic
}
catch(Exception e)
{
kafkaProducer.abortTransaction();
}
kafkaProducer.commitTransaction();
Проблема в том, что для одного потока выше работает просто отлично, но когда несколько потоков записывают, это вызывает исключение
Попытка недопустимой транзакции из состояния IN_TRANSITION в IN_TRANSITION
во время отладки я обнаружил, что если выполняется транзакция thread1, а thread2 также сообщает, что beingTransaction выдает это исключение. Чего я не нахожу, если как решить эту проблему. Одна из возможных вещей, которые я мог бы найти, — это создание пула продуктов.
Существует ли какой-либо уже доступный API для пула производителей kafka, или мне придется создать свой собственный.
Ниже приведено улучшение, о котором jira уже сообщала для этого. https://issues.apache.org/jira/browse/KAFKA-6278
Любое другое предложение будет действительно полезным
Ответ №1:
Одновременно с экземпляром производителя может выполняться только одна транзакция.
Если у вас есть несколько потоков, выполняющих отдельную обработку, и все они нуждаются в семантике ровно один раз, у вас должен быть экземпляр производителя для каждого потока.
Комментарии:
1. Есть ли какой-либо API, доступный в kafka для ProducerPool, чтобы мы не создавали столько производителей, а затем закрывали их.
2. Это не является частью API Apache Kafka. Некоторые фреймворки, такие как spring-kafka, предоставляют эту функциональность
Ответ №2:
Не уверен, что это было решено. вы можете использовать apache common pool2 для создания пула экземпляров производителя. В методе create() фабричной реализации вы можете сгенерировать и назначить уникальный идентификатор транзакции, чтобы избежать конфликта (исключение ProducerFencedException)