#spring #apache-kafka #spring-kafka #dead-letter
#весна #apache-kafka #spring-kafka #мертвая буква
Вопрос:
У меня есть конфигурация:
@Configuration
@EnableKafka
public class ConsumerConfig {
final DlqErrorHandler dlqErrorHandler;
public ConsumerConfig(DlqErrorHandler dlqErrorHandler) {
this.dlqErrorHandler = dlqErrorHandler;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "group_id_two");
config.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.getContainerProperties().setAckOnError(false);
factory.setConcurrency(2);
factory.setErrorHandler(dlqErrorHandler);
return factory;
}
}
Существует реализация обработчика ошибок:
@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
private final KafkaTemplate kafkaTemplate;
public DlqErrorHandler(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void handle(Exception e, List<ConsumerRecord<?, ?>> list, Consumer<?, ?> consumer, MessageListenerContainer messageListenerContainer) {
ConsumerRecord<?, ?> record = list.get(0);
try {
kafkaTemplate.send("dlqTopic", record.key(), record.value());
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() 1);
} catch (Exception exception) {
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
throw new KafkaException("Seek to current after exception", exception);
}
}
}
Есть два слушателя:
@Component
public class KafkaConsumer {
@KafkaListener(topics = "batchProcessingWithRetryPolicy", containerFactory = "concurrentKafkaListenerContainerFactory")
public void consume(String message) {
System.out.println(message " NORMAL");
if (message.equals("TEST ERROR")) {
throw new RuntimeException("EEEEEEEEEEEERRRRRRRRRRRRRRRRRRRRRRROOOOOOOOOOOOOOOOOOORRRRRR");
}
}
@KafkaListener(topics = "dlqTopic", containerFactory = "concurrentKafkaListenerContainerFactory")
public void consumeTwo(String message) {
System.out.println(message " DQL");
if (message.length() > 0) {
throw new RuntimeException("EEEEEEEEEEEERRRRRRRRRRRRRRRRRRRRRRROOOOOOOOOOOOOOOOOOORRRRRR ");
}
}
}
Мой вопрос:
1)
factory.getContainerProperties().setAckOnError(false);
Метод setAckOnError — устарел. Как я могу заменить эту строку кода, чтобы первый прослушиватель после ошибки при обработке сообщения не предпринимал повторных попыток, а отправлял это сообщение в DQL.
- Как мне установить ограничение для DQL (DlqErrorHandler) на повторения и временные интервалы между отправкой сообщений? То есть после первой ошибки сообщение появляется в DQL, затем я хочу сделать еще 3 попытки с интервалом в 30 секунд и, если это не сработает, то идти дальше.
Ответ №1:
ackOnError
заменяется на ErrorHandler.isAckAfterHandle()
.
Ваша реализация обработчика ошибок является неполной — вам также необходимо выполнить поиск других разделов в списке оставшихся записей.
Почему бы вам просто не использовать SeekToCurrentErrorHandler
и DeadLetterPublishingRecoverer
, предоставляемые фреймворком. Они поддерживают ваш вариант использования.