Как создать динамические прослушиватели потоков в Spring Cloud Stream Kafka?

#spring-cloud-stream #spring-cloud-stream-binder-kafka

#spring-cloud-stream #spring-cloud-stream-binder-kafka

Вопрос:

Я использую Spring Cloud Stream Kafka. У меня есть StreamListeners для предопределенных тем. Мое новое требование — создать и остановить среду выполнения StreamListeners для определенного пользователем имени темы. Таким образом, из пользовательского интерфейса пользователь определит и обновит, какие имена тем будут прослушиваться, а слушатели тем (StreamListeners) будут остановлены. Есть ли какой-либо способ иметь гибкую среду выполнения StreamListeners? Я пытался использовать BinderAwareChannelResolver, но при установке разных привязок для разных привязок я получаю ошибку конфигурации UnknownBinder. Я не смог найти подробный пример, охватывающий мои требования, используя BinderAwareChannelResolver.

 @Autowired
private SubscribableChannelBindingTargetFactory bindingTargetFactory;

@Autowired
private BindingService bindingService;

@Autowired
BinderFactory binderFactory;

BindingServiceProperties properties = bindingService.getBindingServiceProperties();
properties.getConsumerProperties(channelName ).setBatchMode(true);

String binderConfigurationName = properties.getBinder(channelName);
Binder<SubscribableChannel, ConsumerProperties, ?> binder = (Binder<SubscribableChannel, ConsumerProperties, ?>) binderFactory.getBinder(binderConfigurationName, channel.getClass());
    copyExtendedConsumerProperties(binder, channelName);

bindingService.bindConsumer(channel, channelName);
channel.subscribe(new DynamicMessageHandler());

private void copyExtendedConsumerProperties(Binder binder, String channelName) {
    KafkaConsumerProperties extension = (KafkaConsumerProperties) ((ExtendedPropertiesBinder) binder).getExtendedConsumerProperties(channelName);
    extension.getConfiguration().put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, PersonDeserializer.class.getName());
    extension.getConfiguration().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "3000");
    extension.getConfiguration().put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "900000");
    extension.getConfiguration().put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "5000");
    extension.getConfiguration().put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}

public class DynamicMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
    List<Person> personList = (List<Person>) message.getPayload();
    System.out.println(personList.size());
}
}
 

Ответ №1:

Вот пример двух StreamListener методов, в которых оба они останавливаются при запуске приложения.

 @Autowired
BindingsEndpoint endpoint;

@StreamListener("input-1")
public void listen1(String in) {
 System.out.println();
}

@StreamListener("input-2")
public void listen2(String in) {
 System.out.println();
}

@Bean
public ApplicationRunner runner() {
  return args -> {
     endpoint.changeState("input-1", State.STOPPED);
     endpoint.changeState("input-1", State.STOPPED);
}
 

Когда вы запускаете это приложение, оно запускается StreamListener с остановкой обоих файлов. Теперь в пользовательском интерфейсе, о котором вы упомянули, необходимо указать имена привязок. В интерфейсе пользователь выберет, скажем, input-1 для запуска. Тогда StreamListener listen1 будет запущен файл с именем as, а другой ( listen2 ) останется остановленным. Вы можете реализовать конечную точку REST для передачи имени привязки, чтобы привязку можно было запустить с помощью BindingsEndpoint .

BinderAwareChannelResolver предназначен для динамических адресатов и используется для исходящих целей. Не уверен, насколько это может быть полезно для вашего варианта использования. В любом случае, BinderAwareChannelResolver , устарел в последних версиях Spring Cloud Stream.

Spring Cloud Stream, начиная с версий 3.0.x, в основном предпочитает процессоры и потребителей с функциональным стилем. Хотя StreamListener он по-прежнему доступен в версии 3.0.x, начиная с версии 3.1.x, его использование устарело. Мы предлагаем вам обновить свои StreamListener методы до функционального стиля.

Два StreamListener описанных выше метода могут быть переписаны следующим образом:

 @Bean
public Consumer<String> listen1() {
  return s -> {};
}

@Bean
public Consumer<String> listen2() {
  return s -> {};
}

 

По умолчанию имена привязок будут listen1-in-0 следующими и listen2-in-0 которые вы можете изменить. Смотрите документы.

Комментарии:

1. Мои требования таковы; — Мои имена тем и количество тем будут определяться во время выполнения и будут определяться пользователем, эти имена тем и количество тем могут меняться в любое время в течение жизненного цикла моего приложения. Поэтому у меня изначально нет возможности иметь предопределенные StreamListeners. — Я хочу иметь возможность динамически устанавливать адреса брокера во время выполнения. — Я хочу динамически устанавливать разные связующие для разных привязок. — Я хочу иметь возможность настроить потребителя для использования данных в пакетном режиме. — Я хочу указать номер раздела со стороны производителя во время выполнения. @sobychacko

2. Я пытался внести такие изменения сам, но я не знаю, правильно ли я это сделал. Я отредактировал свой вопрос и поместил пример кода. Можете ли вы просмотреть? Я был бы рад, если бы вы могли помочь мне в этом отношении. @sobychacko

3. Вероятно, вам придется использовать то, что подробно описано здесь docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE /…

4. В принципе, каждый раз, когда тема меняется, вы можете представить это через sendto.destination заголовок. В любом случае, эти требования не очень тривиальны и требуют дополнительной настройки.