#multithreading #spring-boot #spring-integration #spring-integration-dsl #spring-dsl
#многопоточность #весенняя загрузка #spring-интеграция #spring-интеграция-dsl #spring-dsl
Вопрос:
Я хочу создать простой IntegrationFlow с помощью Spring integration, и у меня возникают трудности.
Я хочу создать поток интеграции, который принимает сообщения из нескольких очередей в Rabbit Mq и отправляет сообщения в разные конечные точки Rest.
я хочу знать, могу ли я распараллелить это.
у меня есть два сценария, которые я хочу проверить на осуществимость :
- первым делом я хочу создать поток для каждой очереди RabbitMQ, который будет прослушивать и выполнять поток после получения сообщения :
- второй сценарий : в этом сценарии я хочу создать динамическое количество потоков для каждой очереди, количество потоков увеличивается или уменьшается в зависимости от количества сообщений.
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
System.out.println(msgString);
})
.get();
}
Ответ №1:
Для первого сценария просто настройте входящий адаптер для каждого и установите выходной канал на общий канал для последующего потока.
Для второго сценария просто установите concurrentConsumers
и maxConcurrentConsumers
в контейнере прослушивателя, и он будет масштабировать потоки вверх / вниз по мере необходимости.
Смотрите документацию Spring AMQP.
Комментарии:
1. Спасибо тебе, Гэри! Я установил для concurrentConsumers значение 1, а для maxConcurrentConsumers значение 5. Чтобы протестировать многопоточность, я добавил режим ожидания в середине потока, чтобы заблокировать этот поток и заставить spring использовать несколько потоков. Я заметил, что программа по-прежнему использует только один поток (он ожидает, пока сообщение не перейдет в режим ожидания, прежде чем принимать следующее сообщение из очереди), он не создал другой поток. так ли это должно себя вести?
2.Смотрите документацию, на которую я вас ссылал
>This works in conjunction with four additional properties: consecutiveActiveTrigger, startConsumerMinInterval, consecutiveIdleTrigger, and stopConsumerMinInterval. With the default settings, the algorithm to increase consumers works as follows:
If the maxConcurrentConsumers has not been reached and an existing consumer is active for ten consecutive cycles AND at least 10 seconds has elapsed since the last consumer was started, a new consumer is started. A consumer is considered active if it received at least one message in batchSize * receiveTimeout