Как периодически публиковать сообщение в activemq spring integration DSL

#spring-boot #spring-integration #dsl #spring-integration-dsl

Вопрос:

Я установил активный MQ и периодически говорю, что каждые 10 секунд хочу отправлять сообщение в «мою очередь».

Я изо всех сил пытаюсь понять язык DSL Spring Integration.

Мне нужно что-то вроде

 IntegrationFlows.from(every 5 seconds)
 .send(message to "my.queue")
 

Ответ №1:

Да, вы можете сделать это с помощью Spring Integration Java DSL и ее IntegrationFlow абстракции. Чтобы выполнить периодическую задачу, вам необходимо использовать эту фабрику в IntegrationFlows для запуска потока:

 /**
 * Provides {@link Supplier} as source of messages to the integration flow.
 * which will be triggered by a <b>provided</b>
 * {@link org.springframework.integration.endpoint.SourcePollingChannelAdapter}.
 * @param messageSource the {@link Supplier} to populate.
 * @param endpointConfigurer the {@link Consumer} to provide more options for the
 * {@link org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean}.
 * @param <T> the supplier type.
 * @return new {@link IntegrationFlowBuilder}.
 * @see Supplier
 */
public static <T> IntegrationFlowBuilder fromSupplier(Supplier<T> messageSource,
        Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {
 

Supplier Может возвращать объект, который вы хотели бы отправить в качестве полезной нагрузки вниз по потоку. Второй потребительский аргумент может быть сконфигурирован с помощью:

 .poller(p -> p.fixedDelay(1000))
 

Таким образом, каждую секунду сообщение будет создаваться из предоставленной полезной нагрузки и отправляться вниз по потоку.

Чтобы отправить сообщение ti Active MQ, вам необходимо использовать a org.springframework.integration.jms.dsl.Jms и его метод для соответствующего адаптера канала:

 /**
 * The factory to produce a {@link JmsOutboundChannelAdapterSpec}.
 * @param connectionFactory the JMS ConnectionFactory to build on
 * @return the {@link JmsOutboundChannelAdapterSpec} instance
 */
public static JmsOutboundChannelAdapterSpec.JmsOutboundChannelSpecTemplateAware outboundAdapter(
        ConnectionFactory connectionFactory) {
 

Результат этой фабрики должен использоваться в обратном вызове DSL, например:

 /**
 * Populate a {@link ServiceActivatingHandler} for the provided
 * {@link MessageHandler} implementation.
 * Can be used as Java 8 Lambda expression:
 * <pre class="code">
 * {@code
 *  .handle(m -> logger.info(m.getPayload())
 * }
 * </pre>
 * @param messageHandler the {@link MessageHandler} to use.
 * @return the current {@link BaseIntegrationFlowDefinition}.
 */
public B handle(MessageHandler messageHandler) {
 

Вся информация содержится в документах: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl

Что-то вроде этого:

 @Bean
public IntegrationFlow jmsPeriodicFlow() {
    return IntegrationFlows.fromSupplier(() -> "hello", 
                               e -> e.poller(p -> p.fixedDelay(5000)))
            .handle(Jms.outboundAdapter(jmsConnectionFactory())
                    .destination("my.queue"))
            .get();
}
 

Комментарии:

1. Смотрите образец в конце моего ответа.