#apache-kafka #apache-kafka-streams
Вопрос:
У меня есть это приложение, написанное на потоках кафки. Время от времени он выдает исключение InvalidPidMappingException.
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
И у меня есть этот фрагмент кода, который устанавливает обработчик необнаруженных исключений для приложения
streams.setUncaughtExceptionHandler(
(Thread thread, Throwable exp) -> {
log.error("Unhandled exception in thread with name ", exp);
SpringApplication.exit(applicationContext, () -> 1);
}
);
Я понимаю, что это исключение возникает, когда у координатора истекает срок действия идентификатора транзакции производителя после того, как он не получил от него никаких обновлений статуса транзакции.
У меня есть несколько вопросов относительно этого исключения:
- Я думал, что после этого исключения производитель повторит попытку синхронизировать идентификатор транзакции с координатором и возобновит работу, не убивая поток потока кафки. Даже если я изменю приведенный выше фрагмент кода, чтобы не выходить из приложения Spring при исключении InvalidPidMappingException, он все равно убьет поток потока. Есть ли способ избежать смерти потока потоков при исключении InvalidPidMappingException? Я видел желаемое поведение, когда есть исключение UnknownProducerIdException. Или я что-то здесь упускаю?
- Кроме истечения срока действия идентификатора транзакции, может ли быть какая-либо другая причина возникновения этого исключения?
- Почему исключение InvalidPidMappingException обрабатывается иначе, чем исключение UnknownProducerIdException? Первый убивает Протектора потока, а второй восстанавливается просто отлично.
Я использую следующие версии библиотек:
spring-kafka-version = '2.5.5.RELEASE'
apache-kafka-clientVersion = '2.5.1'
confluent-version = '5.4.2'
Ответ №1:
С опозданием на несколько месяцев, но обсуждение этой Апачи Кафки Джиры полезно. Подводя итог, с v2.8.0 и выше библиотеки потоков Apache Kafka вы можете использовать новый метод setUncaughtExceptionHandler() в org.apache.кафка.потоки.KafkaStreams обрабатывает любые неперехваченные исключения и поддерживает поток, завершая текущий поток и создавая новый для будущей обработки, возвращая org.apache.кафка.потоки.ошибки.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD. например
kafkaStreams.setStreamsUncaughtExceptionHandler(e -> {
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
Javadocs для KafkaStreams::Метод setUncaughtExceptionHandler добавлен в 2.8.0