#spring-cloud-stream #spring-cloud-function
#spring-cloud-stream #spring-cloud-функция
Вопрос:
Я хотел бы иметь привязку, которая подключается к определенной очереди / теме и перенаправляет к нужной функции на основе определенной записи заголовка.
Я не смог найти ни одного примера для этого случая. Я пробовал несколько подходов, но ни с одним из них у меня не было успеха.
Это, например, не сработало:
spring:
cloud:
function:
routing:
enabled: true
stream:
function:
routing:
enabled: true
definition: myConsumer;myOtherConsumer;
bindings:
myConsumer-in-0:
destination: myTopic
group: myGroup
binder: myBroker
routing-expression: "headers['MyRoutingInfo'] == 'even' ? 'myEvenConsumer' : 'myOddConsumer'"
myOtherConsumer-in-0: #without specific routing
Приветствуется каждый конкретный пример
Ответ №1:
Я наконец нашел способ достичь своей цели. Но я не уверен, что это способ сделать это:
spring:
cloud:
function:
routing:
enabled: true
routing-expression: "headers['MyRouting'] == 'odd' ? 'oddConsumer' : 'evenConsumer'"
stream:
function:
definition: myConsumer;myOtherConsumer;
bindings:
myConsumer-in-0:
destination: myTopic
group: myGroup
binder: myBroker
myOtherConsumer-in-0: #without specific routing
со следующими компонентами:
@Bean
public Consumer<Message<byte[]>> myConsumer(final RoutingFunction routingFunction) {
return message -> {
LOG.info("Sending to routingFunction");
routingFunction.apply(message);
};
}
@Bean
public Consumer<byte[]> evenConsumer() {
return (payload) -> LOG.info("even got: {}", new String(payload));
}
@Bean
public Consumer<byte[]> oddConsumer() {
return (payload) -> LOG.info("odd got: {}", new String(payload));
}
Ответ №2:
Потребители не «маршрутизируют» сообщения, они потребляют из очередей. Производители маршрутизируют сообщения, используя s.c.s.rabbit.bindings.producer-out-0.producer.routing-key-expression
.
Комментарии:
1. Спасибо, Гэри. Есть ли способ распространить сообщение на стороне потребителя, как это было с StreamListener.condition?
2. Об этом я не знаю.
Ответ №3:
Чтобы включить маршрутизацию, по умолчанию будет создана привязка с именем functionRouter .
Согласно документам:
Функция маршрутизации зарегистрирована в FunctionCatalog под именем functionRouter. Для простоты и согласованности вы также можете обратиться к RoutingFunction .Константа FUNCTION_NAME.
Приведенная ниже конфигурация должна работать нормально:
spring:
cloud:
stream:
function:
definition: functionRouter;
routing:
enabled: true
kafka:
binder:
brokers:
- localhost:9092
bindings:
functionRouter-in-0:
destination: my.topic
group: my.topic.group
function:
routing-expression: "headers['type'] == 'even' ? 'evenConsumer' : 'oddConsumer'"
Вам не нужно создавать четное и нечетное определение функции потребителя.
Ответ №4:
На самом деле вам не нужно указывать spring.cloud.stream.function.routing.enabled=true
параметр в application.properties
файле, чтобы маршрутизация работала, потому что она автоматически работает, как только вы предоставляете routing-expression
параметр — см.: Документация spring cloud stream