Прослушиваемый будущий обратный вызов сильно задерживается

#java #multithreading #concurrency #guava #completable-future

#java #многопоточность #параллелизм #гуава #завершаемый-будущее

Вопрос:

Мои обратные вызовы в моих прослушиваемых фьючерсах Guava задерживаются. Я пишу приложение, которое, по сути, имеет пул потоков, и как только «длительная задача» будет завершена, у меня будет успешный обратный вызов следующим образом:

           ListenableFuture<Boolean> listenableFuture = service.submit(() -> publish(eventName, record, producer));
          Futures.addCallback(listenableFuture, new FutureCallback<>() {
            @Override
            public void onSuccess(Boolean result) {
              System.out
                  .println(LocalTime.now()   " Task completed successfully with result: "   result);
            }

            @Override
            public void onFailure(Throwable t) {
              System.out.println(LocalTime.now()   " Task failed with result: "   t.getMessage());
            }
          }, service);
  

Моя инициализация сервиса выглядит следующим образом:

     executorService = new ThreadPoolExecutor(poolSize, poolSize,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(targetAmount),
        handler);
    service = MoreExecutors.listeningDecorator(executorService);
  

В рамках моей длительной задачи, которую я выполняю, я печатаю, когда сетевой запрос выполняется успешно.

   private Boolean publish(String eventName, ProducerRecord rec, Producer producer) {
      AtomicBoolean failed = new AtomicBoolean(false);
      producer.send(rec, (metadata, exception) -> {
        if (exception != null) {
          failed.set(true);
        } else {
           System.out.println("The network request is successful");
        }
      });

      if (failed.get() == true) {
        return false;
      }

      return true;
  }
  

Проблема

Если я настрою пул потоков на обработку данных размером 1 ММ, я увижу 1 ММ The network request is successful , прежде чем увижу запуск одного оператора обратного вызова println. Почему это так?

Комментарии:

1. Разве обратные вызовы не выполняются на одном и том же исполнителе, следовательно, они будут поставлены в очередь за фактическими задачами сетевого запроса? Боюсь, это просто предположение.

2. что такое 1 ММ? 1 триллион?

3. @AlexeiKaigorodov Другой способ сказать 1 миллион (1 тысяча тысяч)

4. тогда почему не 1M или 1KK?

5. вопросы типа «как исправить» должны быть сформированы в виде готовых компилируемых и исполняемых фрагментов кода. В противном случае проблема r может быть скрыта в отсутствующей части кода, и мы не сможем это исправить.

Ответ №1:

Как указано в документации для Futures.addCallback, обратный вызов выполняется в исполнителе. Ваш обратный вызов будет выполнен только после завершения всех других элементов в очереди. Чтобы исправить это, вы могли бы создать отдельного исполнителя для простого выполнения обратных вызовов, или, поскольку кажется, что вы управляете конструктором пула, вы могли бы создать очередь приоритетов вместо LinkedBlockingQueue в качестве очереди задач и расставить приоритеты ваших обратных вызовов.