Существует ли интеграция функции spring cloud webflux spring cloud stream с http-источником

#spring-webflux #spring-cloud-stream #spring-cloud-function

#spring-webflux #spring-cloud-stream #spring-cloud-функция

Вопрос:

Я пытаюсь интегрировать spring cloud stream с функцией spring cloud webflux

поскольку они не рекомендуют реактивные потоки spring cloud в будущих выпусках, я пытаюсь использовать функции spring cloud https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations

Веб-функция Spring Cloud может предоставлять конечную точку своей функции с путями, как в документе

https://cloud.spring.io/spring-cloud-static/spring-cloud-function/1.0.0.RELEASE/single/spring-cloud-function.html

из облачного потока я вижу, что источник должен быть определен как поставщик https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream.html#_spring_cloud_function

но мой вариант использования — получать данные POST из реактивной конечной точки http и загружать в kafka, есть ли какой-либо способ добиться этого из веб-функции spring cloud и потока spring cloud?

из документа для функции spring cloud с потоком spring cloud

 @SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
    public static void main(String[] args) {
        SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
    }
    @Bean
    public Supplier<Date> date() {
        return () -> new Date(12345L);
    }
}
  

и если я запущу это, я увижу, что date вставляется в kafka каждые 1 секунду, и если я вызову конечную точку get для поставщика, например localhost:/ 8080/date, в результате будет получен ответ date, есть ли какой-либо способ ввести paylaod из post в kafka с помощью функции spring cloud?

Ответ №1:

Существует проблема, которую помог обнаружить ваш вопрос, и она связана с несоответствием жизненного цикла между автоматическими конфигурациями, предоставляемыми функцией и потоком. Проблема проявляется в том, что точка rest, созданная Spring Cloud Functions, не может видеть привязки, поскольку она создана намного раньше

Итак, мы вскоре рассмотрим проблему. Между тем, существует обходной путь, который потребует от вас доступа к output каналу из ApplicationContext (см. Ниже):

 @SpringBootApplication
@EnableBinding(Source.class)
public class SimpleFunctionRabbitDemoApplication {

  public static void main(String[] args) throws Exception {      
    SpringApplication.run(SimpleFunctionRabbitDemoApplication.class);
  }

  @Bean
  public Consumer<String> storeSync(ApplicationContext context) {
     return v -> {
        MessageChannel channel = context.getBean(Source.OUTPUT, MessageChannel.class);
        channel.send(MessageBuilder.withPayload(v).build());
     };
  }
}
  

Комментарии:

1. Спасибо за быстрый ответ, но я все еще не могу внедрить потоки kafka, я предоставил компонент типа Consumer для функции spring cloud, и когда я отправляю в него некоторые данные, я могу видеть данные в функции, но когда я вызываю sink.input().send(MessageBuilder.withPayload(paylaod).build()) я не смог отправить сообщение в kafka, но я получил true при входе в систему. я использую spring boot 2.1.3-release, spring cloud stream Fishtown. SR2, springCloudFunctionVersion 2.0.1.RELEASE

2. Рассмотрите возможность создания небольшого примера, который демонстрирует вашу проблему, и отправьте i на GitHub, таким образом, мы сможем взглянуть

3. привет, пожалуйста, найдите мой пример на github [ github.com/jayasai470/spring-sample-cloud-stream-function ]

4. попробуйте это, пожалуйста github.com/jayasai470/spring-sample-cloud-stream-function

5. Хорошо, я посмотрю завтра, а пока, пожалуйста, обновитесь до последней версии Stream, которая только что была выпущена как 2.2.0.M1 — spring.io/blog/2019/03/26 /…