#java #apache-kafka #kafka-producer-api
Вопрос:
Как я могу создать очередь мертвых писем и протестировать ее? У меня есть производитель и потребитель. У меня также есть конфигурации производителя, такие как acks, повторные попытки.
Producer.java
package org.timothy.producer;
import org.apache.kafka.clients.producer.*;
import org.timothy.producer.common.AppConfigs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.timothy.producer.common.PropConfigs;
import java.util.concurrent.ExecutionException;
public class Producer{
private static final Logger logger = LogManager.getLogger(Producer.class);
public static void main(String[] args) {
logger.info("Creating Kafka Producer...");
KafkaProducer<Integer, String> producer = PropConfigs.prodProps();
logger.info("Start sending messages...");
for (int i = 1; i <= AppConfigs.numEvents; i ) {
ProducerRecord<Integer, String> record = new ProducerRecord<>(AppConfigs.topicName, "This is Message: " i);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Record sent with key " i " to partition " metadata.partition()
" with offset " metadata.offset());
}
catch (ExecutionException e) {
System.out.println("Error in sending record");
//System.out.println(e);
e.printStackTrace();
}
catch (InterruptedException e) {
System.out.println("Error in sending record");
//System.out.println(e);
}
}
producer.flush();
producer.close();
logger.info("Finished - Closing Kafka Producer.");
}
}
Consumer.java
package org.timothy.producer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.timothy.producer.common.AppConfigs;
import org.timothy.producer.common.PropConfigs;
import java.time.Duration;
import java.util.Collections;
public class Consumer{
private static final Logger logger = LogManager.getLogger(Consumer.class);
public static void main(String[] args) {
KafkaConsumer<Integer, String> consumer = PropConfigs.consProps();
consumer.subscribe(Collections.singleton(AppConfigs.topicName));
int noMessageFound = 0;
while(true){
ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofMillis(1000));
if (records.count() == 0) {
noMessageFound ;
if (noMessageFound > AppConfigs.MAX_NO_MESSAGE_FOUND_COUNT)
break;
else
continue;
}
records.forEach(record -> logger.info("Received new record: "
" Key: " record.key()
", Value: " record.value()
", Topic: " record.topic()
", Partition: " record.partition()
", Offset: " record.offset() "n"
));
consumer.commitAsync();
}
}
}
Как я могу реализовать DLQ в своем коде? Как проверить повторные попытки, когда я запускаю свою программу, я не сталкиваюсь с ошибками, поэтому я не знаю, повторяется программа или нет.
Ответ №1:
когда я запускаю свою программу, я не сталкиваюсь с ошибками
Затем представьте их друг другу. Создайте счетчик, когда он достигнет определенного значения, создайте исключение RuntimeException. В блоке catch используйте экземпляр производителя для отправки события в новую тему.
Если вы хотите протестировать повторные попытки, выключите брокера или введите какое-либо другое сетевое исключение