Правильный подход к пулу потоков / фьючерсов?

#java #spring #spring-boot

#java #весна #весенняя загрузка

Вопрос:

У меня есть WebMVC rest api. Один из моих методов rest — это «длительная задача». Моя первоначальная реализация заняла 100 секунд для завершения моих тестовых данных.

Мой второй подход состоял в том, чтобы разбить тестовые данные на страницы и использовать пул потоков. На моем уровне обслуживания у меня есть private final ExecutorService executor = Executors.newFixedThreadPool(8); , и я подчиняюсь ему List<Future<Page>> results = this.executor.invokeAll(pages); , сократив его до 45 секунд.

Вопрос заключается в ожидании всех страниц. Мне действительно нужны страницы для окончательного ответа json, поэтому я делаю:

         List<Future<Page>> results = this.executor.invokeAll(pages);

        for (Future<Page> result : results)
            result.get();
  

Если я это сделаю, я думаю, меня действительно не волнует порядок, в котором они заканчиваются, потому что они будут в правильном порядке в исходном списке страниц.

Но get() — это блокирующий вызов. Это вообще имеет значение в webmvc rest api?

Комментарии:

1. Что вы подразумеваете под «мне нужно, чтобы страницы были в порядке», означает ли это, что исполнитель должен выполнять задачи последовательно (в том порядке, в котором были отправлены задачи)?

2. @sonus21 Нет, я просто имел в виду, что мне нужно получить конечный результат в исходном порядке страниц. Они могут завершаться в любом порядке. Я немного обновил вопрос, поскольку sonarqube, похоже, не жаловался на игнорирование результата .get() .

3. Я не совсем уверен, но, насколько я понимаю, даже если вы поставили в очередь все задачи сразу, get это не должно помочь вам получить страницы в порядке, потому get что метод просто используется для блокировки, чтобы увидеть, завершено ли данное будущее или нет. И это не окажет никакого побочного эффекта на ваш ответ API.

Ответ №1:

Я могу заверить вас, что invokeAll возвращает список фьючерсов в том же порядке, что и переданные вызываемые объекты, т. Е. 3-й Callable<Page> в переданном pages списке является источником 3-го Future<Page> в results списке фьючерсов.

Возвращает: список фьючерсов, представляющих задачи, в том же последовательном порядке, который создается итератором для данного списка задач, каждая из которых завершена

Пока все хорошо, пока с вами все в порядке, они будут доступны после того, как все они вернутся true Future::isDone , нет способа получить доступ к промежуточным результатам с помощью такого метода.

Вы можете использовать несколько вызовов CompletableFuture::supplyAsync(Supplier<T>, Executor) для получения List<CompletableFuture<T>> , а затем CompletableFuture::thenAccept(Consumer<T>) выполнить действие по завершению, ожидая других результатов.

Минимальный образец, использующий a String вместо Page для краткости:

 Random random = new Random();
// simulate irregular delay
List<Callable<String>> pages = List.of(
    () -> { Thread.sleep(random .nextInt(3000) 2000); return "one";},
    () -> { Thread.sleep(random .nextInt(3000) 2000); return "two";},
    () -> { Thread.sleep(random .nextInt(3000) 2000); return "three";},
    () -> { Thread.sleep(random .nextInt(3000) 2000); return "four";},
    () -> { Thread.sleep(random .nextInt(3000) 2000); return "five";}
);

ExecutorService executor = Executors.newFixedThreadPool(8);
List<CompletableFuture<String>> results = pages.stream()
    .map(callable -> CompletableFuture.supplyAsync(
        // this should be wrapped into a method instead
        () -> { try {return callable.call();} catch (Exception ignore) {} return null; },
        executor))
    .collect(Collectors.toList());

// clumsy index print to demonstrate the immediate results
for (int i=0; i<results.size(); i  ) {
    int index = i;
    results.get(i).thenAccept(page -> System.out.println("Finished ("   index   ") : "   page));
}
  

К сожалению, создатели, похоже, хотели избежать любого использования проверяемых исключений при разработке CompatibleFuture , поэтому он несовместим с. Callable<T>

Комментарии:

1. Итак, в конце дня вы все еще вызываете .get() для блокировки. Если я переключу эту службу на webflux, я думаю, в этот момент можно сделать что-то более крутое.

2. Да, Spring Webflux — это круто. Кстати. куда я звоню get() , чтобы заблокировать? Я понятия не имею, как OP хочет обрабатывать результаты, поэтому я предоставил простой фрагмент с использованием потребителя.

3. Ой, извините, я неправильно прочитал код, вы выполняли get() в списке. Я ОП :). Мне нужно вернуть результаты в моем вызове rest mvc через json. Итак, мне нужно собрать результаты вместе, когда все будет сделано, поэтому в какой-то момент это должен быть блокирующий вызов. Мой комментарий о Webflux заключался в том, что я мог бы сделать его mono / flux и вообще не блокировать, но для webmvc мне нужно где-то заблокировать.

4. Когда все будет сделано! = блокировка. Вы практически ждете , пока все они не будут выполнены, что гарантирует либо вызов Future::isDone , либо использование fluent API CompletableFuture . Пока вы уже разрабатываете Spring, я предлагаю вам использовать Spring Webflux, который, по моему МНЕНИЮ, намного лучше справляется с такими вещами (включая блокировку, отложенную как можно позже).