Quarkus Kafka Десериализует исключение из очереди мертвых писем

#error-handling #apache-kafka #deserialization #quarkus #smallrye

#обработка ошибок #апачи-кафка #десериализация #quarkus #маленький #apache-kafka

Вопрос:

Чтобы обеспечить надежность моей службы, мне нужно отправить все входящие сообщения, которые не удалось десериализовать, в раздел мертвых писем, используя kafka-smallrye и quarkus .

Все сообщения по этой теме должны быть в формате avro (но я не мог быть уверен) с определением схемы в реестре схем.

Я установил конфигурацию моего потребителя таким образом:

 mp:
  messaging:
    incoming:
      test-in:
        connector: smallrye-kafka
        group:
          id: test-in-consumer-group
        topic: events-topic
        failure-strategy: dead-letter-queue
        schema:
          registry:
            url: http://localhost:8081
        value:
          deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
          subject:
            name:
              strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
        key:
          deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
          subject:
            name:
              strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
        specific:
          avro:
            reader: true
  

Мой потребительский код:

 @ApplicationScoped
public class Consumer {

    @Incoming("test-in")
    public CompletionStage<Void> store(KafkaRecord<Key,SpecificRecord> data ){
            String schemaFullName = data.getPayload().getSchema().getFullName();
            System.out.println(schemaFullName);

            // other consumer code
            return data.ack();
    }
}
  

Когда потребитель не может десериализовать сообщение, процесс потребления блокируется, вместо этого перемещайте сообщение в мертвую букву и продолжайте. Я полагаю, что ошибка десериализации не привела nack к тому, что сообщение не могло быть перемещено в мертвую букву.

Есть способ переместить не десерилизуемое сообщение в тему мертвого письма?

Ответ №1:

я решил проблему с помощью DeserializationFailureHandler. Вы должны использовать раздел «dead-letter-queue» в качестве обычной темы и отправить сообщение о сбое.

 @ApplicationScoped
@Identifier("failure-dead-letter") // Set the name of the failure handler
public class MyDeserializationFailureHandler
    implements DeserializationFailureHandler<CustomBean> { // Specify the expected type

    private static final Logger LOGGER = Logger.getLogger(MyDeserializationFailureHandler.class);

    @Inject
    @Channel("dead-letter")
    Emitter<DeadLetterBean> deadLetterBeanEmitter;

    @Override
    public CustomBean handleDeserializationFailure(String topic, boolean isKey, String deserializer, byte[] data,
                                           Exception exception, Headers headers) {
        LOGGER.error("ERROR: "   exception.getMessage());
        deadLetterBeanEmitter.send(Message.of(new DeadLetterBean(topic, isKey, deserializer, data, exception))
                                          .withAck(() -> {
                                              // Called when the message is acked
                                              LOGGER.error("SENT TO DEAD LETTER");
                                              return CompletableFuture.completedFuture(null);
                                          })

                                          .withNack(throwable -> {
                                              // Called when the message is nacked
                                              LOGGER.error("ERROR, NOT SENT DEAD LETTER");
                                              return CompletableFuture.completedFuture(null);
                                          }));
        return null;
    }
}
  

также зарегистрируйте новую тему как publisher и обработчик deserialization-failure-handler

     mp.messaging.outgoing.dead-letter.topic=dead-letter-topic-name
    mp.messaging.outgoing.dead-letter.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
    mp.messaging.incoming.message-in.value-deserialization-failure-handler=failure-dead-letter