#java #multithreading #java-8 #multiprocessing #completable-future
#java #многопоточность #java-8 #многопроцессорная обработка #завершаемый-будущее
Вопрос:
У меня есть список объектов типа T
. У меня также есть функциональный интерфейс, который действует как Supplier
имеющий метод для performTask
объекта on и отправляет обратно результат R
, который выглядит следующим образом:
R performTask(T entity) throws Exception
.
Я хочу отфильтровать как успешные результаты, так и ошибки и исключения, вытекающие из него, на отдельные карты. Код, который я написал здесь, требует времени, пожалуйста, предложите, что можно сделать.
Я перебираю список объектов, затем обрабатываю их завершаемое будущее один за другим, что, я думаю, не является правильным способом. Можете ли вы все предложить, что здесь можно сделать?
private void updateResultAndExceptionMaps(List < T > entities, final TaskProcessor < T, R > taskProcessor) {
ExecutorService executor = createExecutorService();
Map < T, R > outputMap = Collections.synchronizedMap(new HashMap < T, R > ());
Map < T, Exception > errorMap = new ConcurrentHashMap < T, Exception > ();
try {
entities.stream()
.forEach(entity -> CompletableFuture.supplyAsync(() -> {
try {
return taskProcessor.performTask(entity);
} catch (Exception e) {
errorMap.put(entity, (Exception) e.getCause());
LOG.error("Error processing entity Exception: " entity, e);
}
return null;
}, executor)
.exceptionally(throwable -> {
errorMap.put(entity, (Exception) throwable);
LOG.error("Error processing entity Throwable: " entity, throwable);
return null;
})
.thenAcceptAsync(R -> outputMap.put(entity, R))
.join()
); // end of for-each
LOG.info("outputMap Map -> " outputMap);
LOG.info("errorMap Map -> " errorMap);
} catch (Exception ex) {
LOG.warn("Error: " ex, ex);
} finally {
executor.shutdown();
}
}
outputmap
должен содержать объект и результат, R
.
errorMap
должен содержать объект и Exception
.
Ответ №1:
Это потому, что вы перебираете List
объекты один за другим, создаете CompletableFuture
объект и немедленно блокируете итерацию из-за join
метода, который ожидает, пока данный процессор не завершит его работу или не выдаст исключение. Вы можете сделать это с полной поддержкой многопоточности, преобразовав каждую сущность в CompletableFuture
, собрав все CompletableFuture
экземпляры и после этого дождавшись всех вызовов join
для каждого.
Приведенный ниже код должен сработать в вашем случае:
entities.stream()
.map(entity -> CompletableFuture.supplyAsync(() -> {
try {
return taskProcessor.performTask(entity);
} catch (Exception e) {
errorMap.put(entity, (Exception) e.getCause());
}
return null;
}, executor)
.exceptionally(throwable -> {
errorMap.put(entity, (Exception) throwable);
return null;
})
.thenAcceptAsync(R -> outputMap.put(entity, R))
).collect(Collectors.toList())
.forEach(CompletableFuture::join);
Комментарии:
1. По какой-то причине для выполнения задачи 🙄 по-прежнему требуется столько же времени, я подозреваю, что виновником каждого join является, есть ли способ, которым мы можем каким-то образом использовать все get . Извините, я новичок в этом, простите мое невежество.
2. @bitsboy, ты удалил
join
вызов послеthenAcceptAsync
метода? Я пробовал это, и у меня это сработало. Не могли бы вы отредактировать свой вопрос с помощью текущей реализации метода? Кроме того, чтоcreateExecutorService
возвращает? Какой класс используется и как он настроен?3. Да, я удалил вызов join() после принятия Taysnc, обычно это та же реализация, что и у вас, без каких-либо изменений, вот реализация для createExcutor: private createExecutorService(){ return Executors.newCachedThreadPool(); }
4. Пожалуйста, обратите внимание, что для обработки примерно 1500 объектов потребовалось 29 минут.
5. @bitsboy, вы можете попробовать с разными исполнителями. Например:
Executors.newFixedThreadPool(10)
и попробуйте изменить количество потоков.