Маршрутизация потока Spring Cloud — выражение для выделенной привязки

#spring-cloud-stream #spring-cloud-function

#spring-cloud-stream #spring-cloud-функция

Вопрос:

Я хотел бы иметь привязку, которая подключается к определенной очереди / теме и перенаправляет к нужной функции на основе определенной записи заголовка.

Я не смог найти ни одного примера для этого случая. Я пробовал несколько подходов, но ни с одним из них у меня не было успеха.

Это, например, не сработало:

 spring:
  cloud:
    function:
      routing:
        enabled: true
    stream:
      function:
        routing:
          enabled: true
        definition: myConsumer;myOtherConsumer;
        bindings:
          myConsumer-in-0:
            destination: myTopic
            group:  myGroup
            binder: myBroker
            routing-expression: "headers['MyRoutingInfo'] == 'even' ? 'myEvenConsumer' : 'myOddConsumer'"
          myOtherConsumer-in-0: #without specific routing
  

Приветствуется каждый конкретный пример

Ответ №1:

Я наконец нашел способ достичь своей цели. Но я не уверен, что это способ сделать это:

     spring:
      cloud:
        function:
          routing:
            enabled: true
          routing-expression: "headers['MyRouting'] == 'odd' ? 'oddConsumer' : 'evenConsumer'"
        stream:
          function:
            definition: myConsumer;myOtherConsumer;
            bindings:
              myConsumer-in-0:
                destination: myTopic
                group:  myGroup
                binder: myBroker
              myOtherConsumer-in-0: #without specific routing
  

со следующими компонентами:

 @Bean
public Consumer<Message<byte[]>> myConsumer(final RoutingFunction routingFunction) {
        return message -> {
           LOG.info("Sending to routingFunction");
           routingFunction.apply(message);
        };
}

@Bean
public Consumer<byte[]> evenConsumer() {
      return (payload) -> LOG.info("even got: {}", new String(payload));
}

@Bean
public Consumer<byte[]> oddConsumer() {
    return (payload) -> LOG.info("odd got: {}", new String(payload));
}
  

Ответ №2:

Потребители не «маршрутизируют» сообщения, они потребляют из очередей. Производители маршрутизируют сообщения, используя s.c.s.rabbit.bindings.producer-out-0.producer.routing-key-expression .

Комментарии:

1. Спасибо, Гэри. Есть ли способ распространить сообщение на стороне потребителя, как это было с StreamListener.condition?

2. Об этом я не знаю.

Ответ №3:

Чтобы включить маршрутизацию, по умолчанию будет создана привязка с именем functionRouter .

Согласно документам:

Функция маршрутизации зарегистрирована в FunctionCatalog под именем functionRouter. Для простоты и согласованности вы также можете обратиться к RoutingFunction .Константа FUNCTION_NAME.

Приведенная ниже конфигурация должна работать нормально:

 spring:
  cloud:
    stream:
      function:
        definition: functionRouter;
        routing:
          enabled: true
      kafka:
        binder:
          brokers:
            - localhost:9092
      bindings:
        functionRouter-in-0:
          destination: my.topic
          group: my.topic.group
    function:
      routing-expression: "headers['type'] == 'even' ? 'evenConsumer' : 'oddConsumer'"
  

Вам не нужно создавать четное и нечетное определение функции потребителя.

Ответ №4:

На самом деле вам не нужно указывать spring.cloud.stream.function.routing.enabled=true параметр в application.properties файле, чтобы маршрутизация работала, потому что она автоматически работает, как только вы предоставляете routing-expression параметр — см.: Документация spring cloud stream