Создание очереди мертвых писем

#java #apache-kafka #kafka-producer-api

Вопрос:

Как я могу создать очередь мертвых писем и протестировать ее? У меня есть производитель и потребитель. У меня также есть конфигурации производителя, такие как acks, повторные попытки.

Producer.java

 package org.timothy.producer;

import org.apache.kafka.clients.producer.*;
import org.timothy.producer.common.AppConfigs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.timothy.producer.common.PropConfigs;

import java.util.concurrent.ExecutionException;

public class Producer{

    private static final Logger logger = LogManager.getLogger(Producer.class);
    public static void main(String[] args) {
        logger.info("Creating Kafka Producer...");
        KafkaProducer<Integer, String> producer = PropConfigs.prodProps();

        logger.info("Start sending messages...");

        for (int i = 1; i <= AppConfigs.numEvents; i  ) {
            ProducerRecord<Integer, String> record = new ProducerRecord<>(AppConfigs.topicName, "This is Message: "   i);
            try {
                RecordMetadata metadata = producer.send(record).get();
                System.out.println("Record sent with key "   i   " to partition "   metadata.partition()
                          " with offset "   metadata.offset());
            }
            catch (ExecutionException e) {
                System.out.println("Error in sending record");
                //System.out.println(e);
                e.printStackTrace();
            }
            catch (InterruptedException e) {
                System.out.println("Error in sending record");
                //System.out.println(e);
            }
        }
        producer.flush();
        producer.close();
        logger.info("Finished - Closing Kafka Producer.");
    }
}
 

Consumer.java

 package org.timothy.producer;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.timothy.producer.common.AppConfigs;
import org.timothy.producer.common.PropConfigs;
import java.time.Duration;
import java.util.Collections;


public class Consumer{

    private static final Logger logger = LogManager.getLogger(Consumer.class);
    public static void main(String[] args) {
        KafkaConsumer<Integer, String> consumer = PropConfigs.consProps();

        consumer.subscribe(Collections.singleton(AppConfigs.topicName));
        int noMessageFound = 0;

        while(true){
            ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofMillis(1000));
            
            if (records.count() == 0) {
                noMessageFound  ;
                if (noMessageFound > AppConfigs.MAX_NO_MESSAGE_FOUND_COUNT)
                    break;
                else
                    continue;
            }

            records.forEach(record -> logger.info("Received new record: "  
                    " Key: "   record.key()  
                    ", Value: "   record.value()  
                    ", Topic: "   record.topic()  
                    ", Partition: "   record.partition()  
                    ", Offset: "   record.offset()   "n"
            ));
            consumer.commitAsync();

        }

    }
}
 

Как я могу реализовать DLQ в своем коде? Как проверить повторные попытки, когда я запускаю свою программу, я не сталкиваюсь с ошибками, поэтому я не знаю, повторяется программа или нет.

Ответ №1:

когда я запускаю свою программу, я не сталкиваюсь с ошибками

Затем представьте их друг другу. Создайте счетчик, когда он достигнет определенного значения, создайте исключение RuntimeException. В блоке catch используйте экземпляр производителя для отправки события в новую тему.

Если вы хотите протестировать повторные попытки, выключите брокера или введите какое-либо другое сетевое исключение