Многопоточность Spring integration DSL

#multithreading #spring-boot #spring-integration #spring-integration-dsl #spring-dsl

#многопоточность #весенняя загрузка #spring-интеграция #spring-интеграция-dsl #spring-dsl

Вопрос:

Я хочу создать простой IntegrationFlow с помощью Spring integration, и у меня возникают трудности.

Я хочу создать поток интеграции, который принимает сообщения из нескольких очередей в Rabbit Mq и отправляет сообщения в разные конечные точки Rest.

я хочу знать, могу ли я распараллелить это.

у меня есть два сценария, которые я хочу проверить на осуществимость :

  • первым делом я хочу создать поток для каждой очереди RabbitMQ, который будет прослушивать и выполнять поток после получения сообщения :

Сценарий 1

  • второй сценарий : в этом сценарии я хочу создать динамическое количество потоков для каждой очереди, количество потоков увеличивается или уменьшается в зависимости от количества сообщений.

Сценарий 2

  HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        RestTemplate restTemplate = new RestTemplate();
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(BOUTIQUE_QUEUE_NAME);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
                .handle(msg ->
                {
                    String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
                    HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
                    restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
                    System.out.println(msgString);
                   
                })
                .get();
    }
  

Ответ №1:

Для первого сценария просто настройте входящий адаптер для каждого и установите выходной канал на общий канал для последующего потока.

Для второго сценария просто установите concurrentConsumers и maxConcurrentConsumers в контейнере прослушивателя, и он будет масштабировать потоки вверх / вниз по мере необходимости.

Смотрите документацию Spring AMQP.

Комментарии:

1. Спасибо тебе, Гэри! Я установил для concurrentConsumers значение 1, а для maxConcurrentConsumers значение 5. Чтобы протестировать многопоточность, я добавил режим ожидания в середине потока, чтобы заблокировать этот поток и заставить spring использовать несколько потоков. Я заметил, что программа по-прежнему использует только один поток (он ожидает, пока сообщение не перейдет в режим ожидания, прежде чем принимать следующее сообщение из очереди), он не создал другой поток. так ли это должно себя вести?

2.Смотрите документацию, на которую я вас ссылал >This works in conjunction with four additional properties: consecutiveActiveTrigger, startConsumerMinInterval, consecutiveIdleTrigger, and stopConsumerMinInterval. With the default settings, the algorithm to increase consumers works as follows: If the maxConcurrentConsumers has not been reached and an existing consumer is active for ten consecutive cycles AND at least 10 seconds has elapsed since the last consumer was started, a new consumer is started. A consumer is considered active if it received at least one message in batchSize * receiveTimeout