Camel — создание параллельных запросов GET и агрегирование результатов с помощью динамических маршрутов с использованием Java DSL

#rest #apache-camel #jetty

#rest #apache-camel #причал

Вопрос:

Я получаю запрос на конечной точке Http Jetty. Тело запроса содержит некоторые URL-адреса в теле запроса. Я должен сделать GET запрос на эти URL-адреса. Затем агрегируйте результаты каждого GET запроса и возвращайте его вызывающему.

Тело запроса:-

 {
    "data" : [
        {"name" : "Hello", "url" : "http://server1"}
        {"name" : "Hello2", "url" : "http://server2"}

    ]
}
  

Один из способов, которым я могу думать об этом, выглядит следующим образом:-

 from("jetty:http://localhost:8888/hello").process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        // 1. Make the GET request in parallel using ThreadPoolExecutor
        // 2. Wait for all calls to finish. Collate the response
        // 3. Write it to exchange.getOut().setBody
    }
})
  

Может ли кто-нибудь сообщить мне, можно ли этого достичь с помощью Java DSL с использованием динамических маршрутов camel, разделителя и агрегатора, чтобы Processor они оставались относительно небольшими?

Я использую camel 2.16.3.

Комментарии:

1. Концептуально вы могли бы разделить исходное сообщение так, чтобы каждое новое сообщение имело один URL. Эти разделенные запросы могут проходить через какой-либо процессор. После этого вы можете разместить агрегатор. Я предполагаю, что первоначальному потребителю может поступать несколько сообщений, поэтому вам нужно выяснить, как определить, какие URL-адреса принадлежат какой URL-группе.

2. Вы предлагаете процессору выполнить HTTP-вызов или подключить процессор к некоторым to() ?

3. Не обязательно. Я сформулировал это таким образом, потому что в вашем примере показано, что процессор выполняет GET. Вы можете использовать один из компонентов Camel: http, http4 или даже cxfrs или spring-ws, … в зависимости от того, что соответствует вашим потребностям с наименьшей сложностью.

4. Поскольку URL-адрес находится в теле запроса, который поступает в конечную точку jetty. Можете ли вы сообщить мне, как я могу добавить URL-адрес в компонент http4 после разделителя? Поскольку URL-адрес здесь довольно динамичный.

Ответ №1:

Шаги будут:

  1. Разделите входящие данные на URL-адреса. Это может включать в себя подэтапы: например, вы можете разобрать входящую строку JSON в некоторый POJO, и, возможно, у этого POJO есть массив или список, где каждая запись является URL. Затем вы можете передать это как тело. (Конечно, вам может понадобиться другая информация из входящего запроса, поэтому вы можете изменить это.)
  2. Разделитель будет достаточно легко разделяться, если тело представляет собой массив или что-то еще, с чем он может легко справиться. В простейшем случае разделитель будет передавать один URI в теле каждого разделенного сообщения.
  3. Далее — в потоке разделителя — у вас может быть производитель, такой как http4, но вместо использования URI в конечной точке, вы можете попросить его использовать URI в сообщении.
  4. Наконец, у вас будет агрегатор.

Похоже, суть вашего вопроса касается динамического URI. Вот как может выглядеть фрагмент кода:

  from(...)... etc. 
    .setHeader(Exchange.HTTP_URI, simple("${body}"))
    .setHeader(Exchange.HTTP_METHOD,  
              constant(org.apache.camel.component.http4.HttpMethods.GET))
    .to("http4://google.com")
  

Для небольшой рабочей демонстрации см. Этот класс .

 public class HttpDynamicClient extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("direct:testMultiple")
        .split(body())
        .to("direct:httpClient");

        from("direct:httpClient")
        .log("starting httpClient route")
        .setHeader(Exchange.HTTP_URI, simple("${body}"))
        .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.GET))
        .to("http4://google.com")
        .process(new BodyToStringConverter())
        .log(LoggingLevel.INFO, "Output was ${body}");
    }

    private static class BodyToStringConverter implements Processor {
        @Override
        public void process(Exchange exchange) throws Exception {
            exchange.getOut().setBody(exchange.getIn().getBody(String.class));
        }
    }


    public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        try {
            Logger logger = Logger.getLogger(HttpDynamicClient.class);

            context.addRoutes(new HttpDynamicClient());
            ProducerTemplate template = context.createProducerTemplate();
            context.start();
            Thread.sleep(1000);


            template.sendBody("direct:httpClient", "http://jsonplaceholder.typicode.com/posts/1");
            Thread.sleep(2000);

            template.sendBody("direct:testMultiple", new String [] {"http://jsonplaceholder.typicode.com/posts/1" , "http://jsonplaceholder.typicode.com/posts/1"});


        } finally {
            context.stop();
        }
    }

}
  

Комментарии:

1. Спасибо !!. Это именно то, что я искал.

Ответ №2:

Ответ @Darius X. — это в значительной степени то, что вам нужно. Чтобы заставить серверные запросы выполняться параллельно и объединять тела ответов в список строк, вам необходимо настроить стратегию агрегирования и установить флаг параллельной обработки в определении разделения.

 @Override
public void configure() throws Exception {

    from("direct:testMultiple")
            .split(body(), new FlexibleAggregationStrategy<String>()
                    .pick(Builder.body())
                    .castAs(String.class)
                    .accumulateInCollection(ArrayList.class))
            .parallelProcessing()
            // .executorService(<instance of java.util.concurrent.ExecutorService>) // optional: use custom thread pool for parallel processing
            .to("direct:httpClient");

    from("direct:httpClient")
            .log("starting httpClient route")
            .setHeader(Exchange.HTTP_URI, simple("${body}"))
            .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.GET))
            .to("http4://google.com")
            .convertBodyTo(String.class)
            .log(LoggingLevel.INFO, "Output was ${body}");
}
  

Сообщение out обмена, возвращаемое, direct:testMultiple будет содержать ваш результирующий массив в виде тела.