Почему это CompletableFuture работает, даже когда я не вызываю get() или join() ?

#java #multithreading #completable-future

#java #многопоточность #завершаемый-будущее

Вопрос:

У меня возник вопрос во время изучения CompletableFuture . Методы get() / join() блокируют вызовы. Что, если я не вызываю ни один из них?

Этот код вызывает get() :

 // Case 1 - Use get()
CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(1_000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Hello");
}).get();
System.out.println("World!");

Thread.sleep(5_000L); // Don't finish the main thread
 

Вывод:

 Hello
World!
 

Этот код get() не вызывает ни join() :

 // Case 2 - Don't use get()
CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(1_000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Hello");
});
System.out.println("World!");

Thread.sleep(5_000L); // For don't finish main thread
 

Вывод:

 World!
Hello
 

Я не знаю, почему работает выполняемый блок case 2 .

Комментарии:

1. Почему вы не ожидаете, что он будет запущен?

2. @LouisWasserman В материале, который я изучил, не было ничего, что я не записал. Поэтому я ожидал, что это не сработает. Как и терминальные операции Stream Api.

3. @LouisWasserman Поскольку такие проекты, как реактивные потоки, являются обычным явлением, новичкам не всегда очевидно различие между подходами «push» и «pull».

4. Отличный вопрос. Другие примеры поведения «вытягивания»: в Python генератор фактически ничего не делает, пока он не будет выполнен. Если вы создаете генератор, но не запускаете его, ничего не происходит. То же самое касается фьючерсов в Rust. Они запускаются только тогда, когда вы .await их используете.

5. @chrylis-осторожно оптимистично — Это верно. Я не понял метод «pull» и «push». Давайте узнаем об этих двух концепциях. Спасибо.

Ответ №1:

Вся идея CompletableFuture заключается в том, что они должны быть немедленно запущены (хотя вы не можете достоверно сказать, в каком потоке они будут выполняться), и к тому времени, когда вы достигнете get или join , результат может быть уже готов, т. Е.: CompletableFuture возможно, уже завершен. Внутренне, как только определенный этап в конвейере будет готов, для этого конкретного CompletableFuture будет установлено значение completed . Например:

 String result = 
   CompletableFuture.supplyAsync(() -> "ab")
                    .thenApply(String::toUpperCase)
                    .thenApply(x -> x.substring(1))
                    .join();
 

это то же самое, что:

 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "ab");
CompletableFuture<String> cf2 = cf1.thenApply(String::toUpperCase);
CompletableFuture<String> cf3 = cf2.thenApply(x -> x.substring(1));
String result = cf3.join();
 

К тому времени, когда вы дойдете до фактического вызова join , cf3 возможно, он уже завершится. get и join просто блокируйте, пока не будут выполнены все этапы, это не запускает вычисление; вычисление запланировано немедленно.


Незначительным дополнением является то, что вы можете завершить a CompletableFuture , не дожидаясь завершения выполнения конвейеров: например complete , completeExceptionally , obtrudeValue (этот устанавливает его, даже если он уже был завершен), obtrudeException или cancel . Вот интересный пример:

  public static void main(String[] args) {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        System.out.println("started work");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
        System.out.println("done work");
        return "a";
    });

    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    cf.complete("b");
    System.out.println(cf.join());
}
 

Это приведет к:

 started work
b
 

Таким образом, даже если работа началась, конечное значение равно b , not a .

Комментарии:

1. Чтобы быть более точным, эти фабричные методы будут планировать действие или поставщика немедленно, тогда CompletionStage как методы будут планировать немедленно, когда будут выполнены предварительные условия. Напротив, сама идея CompletableFuture отличается; как следует из названия, это просто будущее, которое может быть завершено, т. Е. Когда вы создаете экземпляр с помощью конструктора по умолчанию, ничего не произойдет, если кто-то явно не завершит его. Не следует забывать вторую часть вашего ответа cancel , которая является ничем иным, как исключительным завершением.

Ответ №2:

Я не знаю, почему Runnable работает блок case2.

Нет причин, по которым это НЕ сработало бы.

runAsync(...) Метод говорит о выполнении задачи асинхронно. Предполагая, что приложение не завершается преждевременно, задача в конечном итоге будет выполнена, независимо от того, ждете вы ее выполнения или нет.

CompletableFuture Предоставляет различные способы ожидания завершения задачи. Но в вашем примере вы не используете его для этой цели. Вместо Thread.sleep(...) этого вызов вашего основного метода имеет тот же эффект; т. Е. Он ожидает достаточно долго, чтобы задача (вероятно) завершилась. Так "Hello" выводится раньше "World" .

Просто чтобы повторить, get() вызов не вызывает выполнение задачи. Скорее он ждет, пока это произойдет.


Использование sleep для ожидания события (например, завершения задачи) — плохая идея:

  1. Sleep не сообщает, произошло ли событие!
  2. Обычно вы не знаете точно, сколько времени потребуется для события, вы не знаете, как долго спать.
  3. Если вы спите слишком долго, у вас «мертвое время» (см. Ниже).
  4. Если вы спите недостаточно долго, событие, возможно, еще не произошло. Так что вам нужно протестировать и поспать еще раз, и еще, и еще…

Даже в этом примере теоретически возможно 1 для завершения sleep in main перед sleep in задачей.

По сути, цель CompletableFuture состоит в том, чтобы обеспечить эффективный способ ожидания завершения задачи и выдачи результата. Вы должны использовать его…

Для иллюстрации. Ваше приложение ожидает (и тратит впустую) ~ 4 секунды между выводом "Hello" и "World!" . Если бы вы использовали CompletableFuture так, как оно предназначено для использования, у вас не было бы этих 4 секунд «мертвого времени».


1 — Например, какой-то внешний агент может выборочно «приостановить» поток, выполняющий задачу. Это может быть сделано путем установки точки останова…

Комментарии:

1. Вам не нужен внешний агент для выборочной приостановки потока, высокая загрузка системы уже будет иметь тот же эффект; когда поток не выполняется, время будет продолжать тикать, позволяя sleep вернуться, как только приложение снова получит процессорное время.

2. ДА. Есть и другие способы, которыми это может произойти.

Ответ №3:

Второй случай «работает«, потому что вы спите в основном потоке достаточно долго (5 секунд). Работа заключается в кавычках, потому что на самом деле она не работает, просто завершается. Я предполагаю, что здесь код должен выводиться Hello World! , чтобы считаться «работающим должным образом«.


Попробуйте тот же код с этим временем ожидания в конце основного потока в обоих случаях:

Thread.sleep(100);

1. Первый будет вести себя точно так же, поскольку операция get блокирует основной поток. Фактически, в первом случае вам даже не нужно время последнего ожидания.

Вывод: Hello World!


2. Второй случай не будет выведен Hello , так как никто не сказал основному потоку: «Эй, подождите, пока это закончится«. Это то, что get() делает: блокирует вызывающего, чтобы дождаться завершения задачи. Без этого и установки низкого времени ожидания в конце вызывается runnable, но не может завершить свою работу до остановки основного потока.

Вывод: World!


Это также причина, по которой в первом случае Hello World! (сначала вывод runnable, а затем основной — это означает, что основной поток был заблокирован до get() возвращения) записывается, в то время как во втором видны тонкие признаки дислексии: World Hello!

Но это не дислексия, он просто выполняет то, что ему говорят. Во втором случае это происходит:

1. Вызывается runnable.

2. Основной поток продолжает свой процесс, печатая («Мир!)

3. Sleep время установлено: 1 секунда на исполняемом / 5 секунд на основном. (режим сна runnable также может быть выполнен во время 2-го шага, но я поместил его здесь, чтобы прояснить поведение)

4. Выполняемая задача печатается («Привет») через 1 секунду, и завершаемое будущее завершено.

Прошло 5,5 секунд, основной поток останавливается.

Таким образом, ваш runnable мог печатать Hello , потому что он смог выполнить команду в промежутке между этими 5 секундами ожидания.

 World! . . . . . .(1)Hello. . . . . . . . . . .(5)[END]
 

Если вы уменьшите тайм-аут последних 5 секунд, например, до 0,5 секунды, вы получите

 World!. . (0.5)[END]
 

Комментарии:

1. Хорошо, спасибо вам. Тогда работает ли выполняемый код, который использовал get() или нет?

2. @SangHoonLee Во втором примере это не сработает, так как у него не было достаточно времени для завершения. get() просто гарантирует, что вызывающий поток будет заблокирован до завершения выполнения. Без него это просто асинхронный процесс, поэтому основной поток не будет его ждать: он просто вызовет его и продолжит свой процесс.

3. Даже сон беглеца не смог бы закончить сон . это не совсем правильно. Просто измените его на CompletableFuture.runAsync(() -> { try { Thread.sleep(1_000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Hello"); }, Executors.newSingleThreadExecutor()); и посмотрите, что произойдет.

4. Совершенно верно, написал, что, поскольку в этом случае асинхронность выполняется без определенного исполнителя, поэтому он использует ForkJoinPool. Но вы правы, поэтому я отредактирую эту часть, чтобы избежать путаницы

5. кажется, вы путаете здесь много понятий, когда реальность такова, что это просто. Я бы посоветовал вам прочитать, что такое поток демона (это не поток с низким приоритетом ). Приоритет для потоков — это совершенно другое дело.