Spring Kafka мертвая очередь сообщений и повторные попытки

#spring #apache-kafka #spring-kafka #dead-letter

#весна #apache-kafka #spring-kafka #мертвая буква

Вопрос:

У меня есть конфигурация:

 @Configuration
@EnableKafka
public class ConsumerConfig {

    final DlqErrorHandler dlqErrorHandler;

    public ConsumerConfig(DlqErrorHandler dlqErrorHandler) {
        this.dlqErrorHandler = dlqErrorHandler;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "group_id_two");
        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(false);
        factory.getContainerProperties().setAckOnError(false);
        factory.setConcurrency(2);
        factory.setErrorHandler(dlqErrorHandler);
        return factory;
    }
}
  

Существует реализация обработчика ошибок:

 @Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    private final KafkaTemplate kafkaTemplate;

    public DlqErrorHandler(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void handle(Exception e, List<ConsumerRecord<?, ?>> list, Consumer<?, ?> consumer, MessageListenerContainer messageListenerContainer) {
        ConsumerRecord<?, ?> record = list.get(0);

        try {
            kafkaTemplate.send("dlqTopic", record.key(), record.value());
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset()   1);
        } catch (Exception exception) {
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
            throw new KafkaException("Seek to current after exception", exception);
        }
    }
}
  

Есть два слушателя:

 @Component
public class KafkaConsumer {
    @KafkaListener(topics = "batchProcessingWithRetryPolicy", containerFactory = "concurrentKafkaListenerContainerFactory")
    public void consume(String message) {
        System.out.println(message   " NORMAL");
        if (message.equals("TEST ERROR")) {
            throw new RuntimeException("EEEEEEEEEEEERRRRRRRRRRRRRRRRRRRRRRROOOOOOOOOOOOOOOOOOORRRRRR");
        }
    }

    @KafkaListener(topics = "dlqTopic", containerFactory = "concurrentKafkaListenerContainerFactory")
    public void consumeTwo(String message) {
        System.out.println(message   " DQL");
        if (message.length() > 0) {
            throw new RuntimeException("EEEEEEEEEEEERRRRRRRRRRRRRRRRRRRRRRROOOOOOOOOOOOOOOOOOORRRRRR ");
        }
    }
}
  

Мой вопрос:

1)

 factory.getContainerProperties().setAckOnError(false);
  

Метод setAckOnError — устарел. Как я могу заменить эту строку кода, чтобы первый прослушиватель после ошибки при обработке сообщения не предпринимал повторных попыток, а отправлял это сообщение в DQL.

  1. Как мне установить ограничение для DQL (DlqErrorHandler) на повторения и временные интервалы между отправкой сообщений? То есть после первой ошибки сообщение появляется в DQL, затем я хочу сделать еще 3 попытки с интервалом в 30 секунд и, если это не сработает, то идти дальше.

Ответ №1:

ackOnError заменяется на ErrorHandler.isAckAfterHandle() .

Ваша реализация обработчика ошибок является неполной — вам также необходимо выполнить поиск других разделов в списке оставшихся записей.

Почему бы вам просто не использовать SeekToCurrentErrorHandler и DeadLetterPublishingRecoverer , предоставляемые фреймворком. Они поддерживают ваш вариант использования.