#spring-cloud-stream #spring-cloud-stream-binder-kafka
#spring-cloud-stream #spring-cloud-stream-binder-kafka
Вопрос:
Я использую Spring Cloud Stream Kafka. У меня есть StreamListeners для предопределенных тем. Мое новое требование — создать и остановить среду выполнения StreamListeners для определенного пользователем имени темы. Таким образом, из пользовательского интерфейса пользователь определит и обновит, какие имена тем будут прослушиваться, а слушатели тем (StreamListeners) будут остановлены. Есть ли какой-либо способ иметь гибкую среду выполнения StreamListeners? Я пытался использовать BinderAwareChannelResolver, но при установке разных привязок для разных привязок я получаю ошибку конфигурации UnknownBinder. Я не смог найти подробный пример, охватывающий мои требования, используя BinderAwareChannelResolver.
@Autowired
private SubscribableChannelBindingTargetFactory bindingTargetFactory;
@Autowired
private BindingService bindingService;
@Autowired
BinderFactory binderFactory;
BindingServiceProperties properties = bindingService.getBindingServiceProperties();
properties.getConsumerProperties(channelName ).setBatchMode(true);
String binderConfigurationName = properties.getBinder(channelName);
Binder<SubscribableChannel, ConsumerProperties, ?> binder = (Binder<SubscribableChannel, ConsumerProperties, ?>) binderFactory.getBinder(binderConfigurationName, channel.getClass());
copyExtendedConsumerProperties(binder, channelName);
bindingService.bindConsumer(channel, channelName);
channel.subscribe(new DynamicMessageHandler());
private void copyExtendedConsumerProperties(Binder binder, String channelName) {
KafkaConsumerProperties extension = (KafkaConsumerProperties) ((ExtendedPropertiesBinder) binder).getExtendedConsumerProperties(channelName);
extension.getConfiguration().put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, PersonDeserializer.class.getName());
extension.getConfiguration().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "3000");
extension.getConfiguration().put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "900000");
extension.getConfiguration().put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "5000");
extension.getConfiguration().put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
public class DynamicMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
List<Person> personList = (List<Person>) message.getPayload();
System.out.println(personList.size());
}
}
Ответ №1:
Вот пример двух StreamListener
методов, в которых оба они останавливаются при запуске приложения.
@Autowired
BindingsEndpoint endpoint;
@StreamListener("input-1")
public void listen1(String in) {
System.out.println();
}
@StreamListener("input-2")
public void listen2(String in) {
System.out.println();
}
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("input-1", State.STOPPED);
endpoint.changeState("input-1", State.STOPPED);
}
Когда вы запускаете это приложение, оно запускается StreamListener
с остановкой обоих файлов. Теперь в пользовательском интерфейсе, о котором вы упомянули, необходимо указать имена привязок. В интерфейсе пользователь выберет, скажем, input-1
для запуска. Тогда StreamListener
listen1
будет запущен файл с именем as, а другой ( listen2
) останется остановленным. Вы можете реализовать конечную точку REST для передачи имени привязки, чтобы привязку можно было запустить с помощью BindingsEndpoint
.
BinderAwareChannelResolver
предназначен для динамических адресатов и используется для исходящих целей. Не уверен, насколько это может быть полезно для вашего варианта использования. В любом случае, BinderAwareChannelResolver
, устарел в последних версиях Spring Cloud Stream.
Spring Cloud Stream, начиная с версий 3.0.x, в основном предпочитает процессоры и потребителей с функциональным стилем. Хотя StreamListener
он по-прежнему доступен в версии 3.0.x, начиная с версии 3.1.x, его использование устарело. Мы предлагаем вам обновить свои StreamListener
методы до функционального стиля.
Два StreamListener
описанных выше метода могут быть переписаны следующим образом:
@Bean
public Consumer<String> listen1() {
return s -> {};
}
@Bean
public Consumer<String> listen2() {
return s -> {};
}
По умолчанию имена привязок будут listen1-in-0
следующими и listen2-in-0
которые вы можете изменить. Смотрите документы.
Комментарии:
1. Мои требования таковы; — Мои имена тем и количество тем будут определяться во время выполнения и будут определяться пользователем, эти имена тем и количество тем могут меняться в любое время в течение жизненного цикла моего приложения. Поэтому у меня изначально нет возможности иметь предопределенные StreamListeners. — Я хочу иметь возможность динамически устанавливать адреса брокера во время выполнения. — Я хочу динамически устанавливать разные связующие для разных привязок. — Я хочу иметь возможность настроить потребителя для использования данных в пакетном режиме. — Я хочу указать номер раздела со стороны производителя во время выполнения. @sobychacko
2. Я пытался внести такие изменения сам, но я не знаю, правильно ли я это сделал. Я отредактировал свой вопрос и поместил пример кода. Можете ли вы просмотреть? Я был бы рад, если бы вы могли помочь мне в этом отношении. @sobychacko
3. Вероятно, вам придется использовать то, что подробно описано здесь docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE /…
4. В принципе, каждый раз, когда тема меняется, вы можете представить это через
sendto.destination
заголовок. В любом случае, эти требования не очень тривиальны и требуют дополнительной настройки.