Обрабатывать список объектов, использующих завершаемые фьючерсы

#java #multithreading #java-8 #multiprocessing #completable-future

#java #многопоточность #java-8 #многопроцессорная обработка #завершаемый-будущее

Вопрос:

У меня есть список объектов типа T . У меня также есть функциональный интерфейс, который действует как Supplier имеющий метод для performTask объекта on и отправляет обратно результат R , который выглядит следующим образом:
R performTask(T entity) throws Exception .

Я хочу отфильтровать как успешные результаты, так и ошибки и исключения, вытекающие из него, на отдельные карты. Код, который я написал здесь, требует времени, пожалуйста, предложите, что можно сделать.

Я перебираю список объектов, затем обрабатываю их завершаемое будущее один за другим, что, я думаю, не является правильным способом. Можете ли вы все предложить, что здесь можно сделать?

 private void updateResultAndExceptionMaps(List < T > entities, final TaskProcessor < T, R > taskProcessor) {

 ExecutorService executor = createExecutorService();
 Map < T, R > outputMap = Collections.synchronizedMap(new HashMap < T, R > ());
 Map < T, Exception > errorMap = new ConcurrentHashMap < T, Exception > ();
 try {

  entities.stream()
   .forEach(entity -> CompletableFuture.supplyAsync(() -> {
     try {
      return taskProcessor.performTask(entity);
     } catch (Exception e) {
      errorMap.put(entity, (Exception) e.getCause());
      LOG.error("Error processing entity Exception: "   entity, e);
     }
     return null;
    }, executor)
    .exceptionally(throwable -> {
     errorMap.put(entity, (Exception) throwable);
     LOG.error("Error processing entity Throwable: "   entity, throwable);
     return null;
    })
    .thenAcceptAsync(R -> outputMap.put(entity, R))
    .join()
   ); // end of for-each 

  LOG.info("outputMap Map -> "   outputMap);
  LOG.info("errorMap Map -> "   errorMap);
 } catch (Exception ex) {
  LOG.warn("Error: "   ex, ex);
 } finally {
  executor.shutdown();
 }
}
  

outputmap должен содержать объект и результат, R .
errorMap должен содержать объект и Exception .

Ответ №1:

Это потому, что вы перебираете List объекты один за другим, создаете CompletableFuture объект и немедленно блокируете итерацию из-за join метода, который ожидает, пока данный процессор не завершит его работу или не выдаст исключение. Вы можете сделать это с полной поддержкой многопоточности, преобразовав каждую сущность в CompletableFuture , собрав все CompletableFuture экземпляры и после этого дождавшись всех вызовов join для каждого.

Приведенный ниже код должен сработать в вашем случае:

 entities.stream()
    .map(entity -> CompletableFuture.supplyAsync(() -> {
            try {
                return taskProcessor.performTask(entity);
            } catch (Exception e) {
                errorMap.put(entity, (Exception) e.getCause());
            }
            return null;
        }, executor)
                .exceptionally(throwable -> {
                    errorMap.put(entity, (Exception) throwable);
                    return null;
                })
                .thenAcceptAsync(R -> outputMap.put(entity, R))
    ).collect(Collectors.toList())
    .forEach(CompletableFuture::join);
  

Комментарии:

1. По какой-то причине для выполнения задачи 🙄 по-прежнему требуется столько же времени, я подозреваю, что виновником каждого join является, есть ли способ, которым мы можем каким-то образом использовать все get . Извините, я новичок в этом, простите мое невежество.

2. @bitsboy, ты удалил join вызов после thenAcceptAsync метода? Я пробовал это, и у меня это сработало. Не могли бы вы отредактировать свой вопрос с помощью текущей реализации метода? Кроме того, что createExecutorService возвращает? Какой класс используется и как он настроен?

3. Да, я удалил вызов join() после принятия Taysnc, обычно это та же реализация, что и у вас, без каких-либо изменений, вот реализация для createExcutor: private createExecutorService(){ return Executors.newCachedThreadPool(); }

4. Пожалуйста, обратите внимание, что для обработки примерно 1500 объектов потребовалось 29 минут.

5. @bitsboy, вы можете попробовать с разными исполнителями. Например: Executors.newFixedThreadPool(10) и попробуйте изменить количество потоков.