#java #multithreading #concurrency #guava #completable-future
#java #многопоточность #параллелизм #гуава #завершаемый-будущее
Вопрос:
Мои обратные вызовы в моих прослушиваемых фьючерсах Guava задерживаются. Я пишу приложение, которое, по сути, имеет пул потоков, и как только «длительная задача» будет завершена, у меня будет успешный обратный вызов следующим образом:
ListenableFuture<Boolean> listenableFuture = service.submit(() -> publish(eventName, record, producer));
Futures.addCallback(listenableFuture, new FutureCallback<>() {
@Override
public void onSuccess(Boolean result) {
System.out
.println(LocalTime.now() " Task completed successfully with result: " result);
}
@Override
public void onFailure(Throwable t) {
System.out.println(LocalTime.now() " Task failed with result: " t.getMessage());
}
}, service);
Моя инициализация сервиса выглядит следующим образом:
executorService = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(targetAmount),
handler);
service = MoreExecutors.listeningDecorator(executorService);
В рамках моей длительной задачи, которую я выполняю, я печатаю, когда сетевой запрос выполняется успешно.
private Boolean publish(String eventName, ProducerRecord rec, Producer producer) {
AtomicBoolean failed = new AtomicBoolean(false);
producer.send(rec, (metadata, exception) -> {
if (exception != null) {
failed.set(true);
} else {
System.out.println("The network request is successful");
}
});
if (failed.get() == true) {
return false;
}
return true;
}
Проблема
Если я настрою пул потоков на обработку данных размером 1 ММ, я увижу 1 ММ The network request is successful
, прежде чем увижу запуск одного оператора обратного вызова println. Почему это так?
Комментарии:
1. Разве обратные вызовы не выполняются на одном и том же исполнителе, следовательно, они будут поставлены в очередь за фактическими задачами сетевого запроса? Боюсь, это просто предположение.
2. что такое 1 ММ? 1 триллион?
3. @AlexeiKaigorodov Другой способ сказать 1 миллион (1 тысяча тысяч)
4. тогда почему не 1M или 1KK?
5. вопросы типа «как исправить» должны быть сформированы в виде готовых компилируемых и исполняемых фрагментов кода. В противном случае проблема r может быть скрыта в отсутствующей части кода, и мы не сможем это исправить.
Ответ №1:
Как указано в документации для Futures.addCallback, обратный вызов выполняется в исполнителе. Ваш обратный вызов будет выполнен только после завершения всех других элементов в очереди. Чтобы исправить это, вы могли бы создать отдельного исполнителя для простого выполнения обратных вызовов, или, поскольку кажется, что вы управляете конструктором пула, вы могли бы создать очередь приоритетов вместо LinkedBlockingQueue в качестве очереди задач и расставить приоритеты ваших обратных вызовов.