Исключение InvalidPidMappingException приводит к закрытию приложения потока кафки

#apache-kafka #apache-kafka-streams

Вопрос:

У меня есть это приложение, написанное на потоках кафки. Время от времени он выдает исключение InvalidPidMappingException.

 Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
 

И у меня есть этот фрагмент кода, который устанавливает обработчик необнаруженных исключений для приложения

 streams.setUncaughtExceptionHandler(
            (Thread thread, Throwable exp) -> {
              log.error("Unhandled exception in thread with name ", exp);
                SpringApplication.exit(applicationContext, () -> 1);
            }
    );
 

Я понимаю, что это исключение возникает, когда у координатора истекает срок действия идентификатора транзакции производителя после того, как он не получил от него никаких обновлений статуса транзакции.
У меня есть несколько вопросов относительно этого исключения:

  1. Я думал, что после этого исключения производитель повторит попытку синхронизировать идентификатор транзакции с координатором и возобновит работу, не убивая поток потока кафки. Даже если я изменю приведенный выше фрагмент кода, чтобы не выходить из приложения Spring при исключении InvalidPidMappingException, он все равно убьет поток потока. Есть ли способ избежать смерти потока потоков при исключении InvalidPidMappingException? Я видел желаемое поведение, когда есть исключение UnknownProducerIdException. Или я что-то здесь упускаю?
  2. Кроме истечения срока действия идентификатора транзакции, может ли быть какая-либо другая причина возникновения этого исключения?
  3. Почему исключение InvalidPidMappingException обрабатывается иначе, чем исключение UnknownProducerIdException? Первый убивает Протектора потока, а второй восстанавливается просто отлично.

Я использую следующие версии библиотек:

 spring-kafka-version = '2.5.5.RELEASE'
apache-kafka-clientVersion = '2.5.1'
confluent-version = '5.4.2'
 

Ответ №1:

С опозданием на несколько месяцев, но обсуждение этой Апачи Кафки Джиры полезно. Подводя итог, с v2.8.0 и выше библиотеки потоков Apache Kafka вы можете использовать новый метод setUncaughtExceptionHandler() в org.apache.кафка.потоки.KafkaStreams обрабатывает любые неперехваченные исключения и поддерживает поток, завершая текущий поток и создавая новый для будущей обработки, возвращая org.apache.кафка.потоки.ошибки.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD. например

 kafkaStreams.setStreamsUncaughtExceptionHandler(e -> {
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
 

Javadocs для KafkaStreams::Метод setUncaughtExceptionHandler добавлен в 2.8.0