#java #spring-cloud #spring-cloud-stream #spring-cloud-dataflow #spring-cloud-stream-binder-kafka
#java #spring-cloud #spring-cloud-stream #spring-cloud-поток данных #spring-cloud-stream-binder-kafka
Вопрос:
Я следовал приведенной ниже документации, и у меня есть производитель и потребитель, которые отлично работают с потоком Kinesis. Я хотел бы понять, как обработать ОШИБКУ в производителе (источнике) в случае возникновения какого-либо исключения.
Я попробовал приведенные ниже подходы в соответствии с документацией по обработке ошибок Spring Stream:
@StreamListener("errorChannel")
@ServiceActivator(inputChannel = "errorChannel")
Оба не работают. Я явно вызываю исключение RuntimeException в методе producer и ожидаю, что это будет в «errorChannel», но не могу получить то же самое.
Пожалуйста, помогите мне разобраться в этом или поделитесь со мной этим подходом, если кто-то успешно это сделал.
Ответ №1:
Примечание: Я столкнулся с этой точной проблемой несколько месяцев назад. Мы использовали Kafka вместо Kinesis. Поскольку вы пометили spring-cloud-stream-binder-kafka, я предоставляю входные данные на том же основании.
Вы можете написать пользовательский обратный вызов для вашего производителя, и этот обратный вызов может сообщить вам, произошел сбой сообщения или оно было успешно опубликовано. При сбое запишите метаданные для сообщения.
Ниже приведен небольшой фрагмент кода, который может лучше объяснить обратный вызов, о котором я говорил.
Пожалуйста, обратите внимание, что это для Kafka. Измените код соответствующим образом для Kinesis.
public class ProduceToKafka {
private ProducerRecord<String, String> message = null;
// TracerBulletProducer class has producer properties
private KafkaProducer<String, String> myProducer = TracerBulletProducer
.createProducer();
public void publishMessage(String string) {
ProducerRecord<String, String> message = new ProducerRecord<>(
"topicName", string);
myProducer.send(message, new MyCallback(message.key(), message.value()));
}
class MyCallback implements Callback {
private final String key;
private final String value;
public MyCallback(String key, String value) {
this.key = key;
this.value = value;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
log.info("--------> All good !!");
} else {
log.info("--------> not so good !!");
log.info(metadata.toString());
log.info("" metadata.serializedValueSize());
log.info(exception.getMessage());
}
}
}
}
Я написал статью medium об обработке сбоев производителя на разных уровнях абстракций Spring. Это может вам помочь. Проверьте это :
https://medium.com/@akhil.ghatiki/kafka-producer-failure-handling-at-multiple-levels-of-spring-abstractions-e530edb02a6c
Комментарии:
1. Спасибо за предоставление этого Akhil. Однако я использую привязку источника потока данных Spring, и у нее есть только 2 типа параметризованного метода send (), который не будет принимать обратный вызов. this.source.output().send(сообщение);
2. Добро пожаловать!! в этом случае попробуйте автоматически подключить прослушиватель producer и реализовать свою пользовательскую логику в новой реализации. Взгляните на раздел «Реализация в потоке данных Spring cloud» в ссылке, которой я поделился в приведенном выше ответе, это может дать вам представление о синусе.
3. Спасибо, Akhil. К сожалению, у Kinesis Binder нет ProducerListener для автоматического подключения.