#error-handling #apache-kafka #deserialization #quarkus #smallrye
#обработка ошибок #апачи-кафка #десериализация #quarkus #маленький #apache-kafka
Вопрос:
Чтобы обеспечить надежность моей службы, мне нужно отправить все входящие сообщения, которые не удалось десериализовать, в раздел мертвых писем, используя kafka-smallrye
и quarkus
.
Все сообщения по этой теме должны быть в формате avro (но я не мог быть уверен) с определением схемы в реестре схем.
Я установил конфигурацию моего потребителя таким образом:
mp:
messaging:
incoming:
test-in:
connector: smallrye-kafka
group:
id: test-in-consumer-group
topic: events-topic
failure-strategy: dead-letter-queue
schema:
registry:
url: http://localhost:8081
value:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
key:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
specific:
avro:
reader: true
Мой потребительский код:
@ApplicationScoped
public class Consumer {
@Incoming("test-in")
public CompletionStage<Void> store(KafkaRecord<Key,SpecificRecord> data ){
String schemaFullName = data.getPayload().getSchema().getFullName();
System.out.println(schemaFullName);
// other consumer code
return data.ack();
}
}
Когда потребитель не может десериализовать сообщение, процесс потребления блокируется, вместо этого перемещайте сообщение в мертвую букву и продолжайте. Я полагаю, что ошибка десериализации не привела nack
к тому, что сообщение не могло быть перемещено в мертвую букву.
Есть способ переместить не десерилизуемое сообщение в тему мертвого письма?
Ответ №1:
я решил проблему с помощью DeserializationFailureHandler. Вы должны использовать раздел «dead-letter-queue» в качестве обычной темы и отправить сообщение о сбое.
@ApplicationScoped
@Identifier("failure-dead-letter") // Set the name of the failure handler
public class MyDeserializationFailureHandler
implements DeserializationFailureHandler<CustomBean> { // Specify the expected type
private static final Logger LOGGER = Logger.getLogger(MyDeserializationFailureHandler.class);
@Inject
@Channel("dead-letter")
Emitter<DeadLetterBean> deadLetterBeanEmitter;
@Override
public CustomBean handleDeserializationFailure(String topic, boolean isKey, String deserializer, byte[] data,
Exception exception, Headers headers) {
LOGGER.error("ERROR: " exception.getMessage());
deadLetterBeanEmitter.send(Message.of(new DeadLetterBean(topic, isKey, deserializer, data, exception))
.withAck(() -> {
// Called when the message is acked
LOGGER.error("SENT TO DEAD LETTER");
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
LOGGER.error("ERROR, NOT SENT DEAD LETTER");
return CompletableFuture.completedFuture(null);
}));
return null;
}
}
также зарегистрируйте новую тему как publisher и обработчик deserialization-failure-handler
mp.messaging.outgoing.dead-letter.topic=dead-letter-topic-name
mp.messaging.outgoing.dead-letter.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.incoming.message-in.value-deserialization-failure-handler=failure-dead-letter