Почему задача иногда выполняется в основном потоке, хотя она была отправлена в общий пул forkJoin?

#java #optimization #concurrency #jit #forkjoinpool

Вопрос:

Может ли кто-нибудь объяснить мне, почему некоторые задачи иногда выполняются в основном потоке, хотя раньше они были отправлены в общий пул forkJoin?

Вот пример в Java 16 (adopt-openjdk-16.0.1):

 import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

public class StrangeUseOfThreads {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int mainThreadCounter = 0;
        int forkJoinPoolThreadCounter = 0;
        for (int i = 0; i < 100; i  ) {
            Future<String> future = ForkJoinPool.commonPool().submit(() -> Thread.currentThread().getName());
            String threadName = future.get();
            if(threadName.equals("main")) {
                mainThreadCounter  ;
            } else {
                forkJoinPoolThreadCounter  ;
            }
        }
        System.out.println("mainThreadCounter="   mainThreadCounter);
        System.out.println("forkJoinPoolThreadCounter="   forkJoinPoolThreadCounter);

        System.out.println("------------------------------------");

        mainThreadCounter = 0;
        forkJoinPoolThreadCounter = 0;
        for (int i = 0; i < 100; i  ) {
            Future<String> future = ForkJoinPool.commonPool().submit(() -> Thread.currentThread().getName());
            ForkJoinPool.commonPool().submit(Thread::yield); // let's disturb here
            String threadName = future.get();
            if(threadName.equals("main")) {
                mainThreadCounter  ;
            } else {
                forkJoinPoolThreadCounter  ;
            }
        }
        System.out.println("mainThreadCounter="   mainThreadCounter);
        System.out.println("forkJoinPoolThreadCounter="   forkJoinPoolThreadCounter);
    }
}
 

Это выход для меня:

 mainThreadCounter=80
forkJoinPoolThreadCounter=20
------------------------------------
mainThreadCounter=0
forkJoinPoolThreadCounter=100
 

В то время как основной поток часто используется в первом цикле, он вообще не используется во втором цикле, потому что я «нарушил» последовательность

 Future<String> future = ForkJoinPool.commonPool().submit(() -> Thread.currentThread().getName());
String threadName = future.get();
 

путем добавления

 ForkJoinPool.commonPool().submit(Thread::yield);
 

между этими двумя строками, что, конечно, как-то бесполезно, но, очевидно, не может быть удалено оптимизацией устранения мертвого кода JIT-компилятора, я думаю.

Я подозреваю, что JIT-компилятор оптимизирует здесь что-то с помощью ForkJoinPool, потому что вызов get () в будущем следует непосредственно после отправки задачи. Тем не менее, я отчаянно искал в Google такую оптимизацию и не нашел ни одной.
Что происходит здесь «под капотом»?

EDIT1:
tgdavies inspired me to modify the example to get stack traces.
That’s my example now (it’s ugly but it works!):

 import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

public class StrangeUseOfThreads {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
             int mainThreadCounter = 0;
        int forkJoinPoolThreadCounter = 0;
        for (int i = 0; i < 100; i  ) {
            Future<Throwable> future = ForkJoinPool.commonPool().submit(
                    () -> new Throwable(Thread.currentThread().getName()));
            Throwable throwable = future.get();
            if(throwable.getMessage().equals("main")) {
                if(mainThreadCounter==0) {
                    throwable.printStackTrace();
                }
                mainThreadCounter  ;
            } else {
                if(forkJoinPoolThreadCounter==0) {
                    throwable.printStackTrace();
                }
                forkJoinPoolThreadCounter  ;
            }
        }
        System.out.println("mainThreadCounter="   mainThreadCounter);
        System.out.println("forkJoinPoolThreadCounter="   forkJoinPoolThreadCounter);
    }
}
 

Вот и результат сейчас:

 java.lang.Throwable: main
    at net.mirwaldt.completable.futures.examples.StrangeUseOfThreads.lambda$main$0(StrangeUseOfThreads.java:13)
    at java.base/java.util.concurrent.ForkJoinTask$AdaptedCallable.exec(ForkJoinTask.java:1458)
    at java.base/java.util.concurrent.ForkJoinTask.doExec$$capture(ForkJoinTask.java:295)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
    at java.base/java.util.concurrent.ForkJoinTask.tryExternalHelp(ForkJoinTask.java:386)
    at java.base/java.util.concurrent.ForkJoinTask.externalInterruptibleAwaitDone(ForkJoinTask.java:356)
    at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1009)
    at net.mirwaldt.completable.futures.examples.StrangeUseOfThreads.main(StrangeUseOfThreads.java:14)
java.lang.Throwable: ForkJoinPool.commonPool-worker-19
    at net.mirwaldt.completable.futures.examples.StrangeUseOfThreads.lambda$main$0(StrangeUseOfThreads.java:13)
    at java.base/java.util.concurrent.ForkJoinTask$AdaptedCallable.exec(ForkJoinTask.java:1458)
    at java.base/java.util.concurrent.ForkJoinTask.doExec$$capture(ForkJoinTask.java:295)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
mainThreadCounter=91
forkJoinPoolThreadCounter=9
 

Это интересно:
В то время как задача в основном потоке была эффективно инициирована вызовом get () в будущем, задача в потоке ForkJoinPool была отправлена в пул forkJoin.
Однако я до сих пор не знаю, почему. Я не могу представить, что это ошибка.

Комментарии:

1. Поместите точку останова или запишите трассировку стека в тело вашего выполняемого объекта, что может дать вам ключ к пониманию того, как он попал в основной поток.

2. Трассировка стека — хорошая идея. Позвольте мне проверить.

3. Я отредактировал свой вопрос и добавил трассировки стека. Спасибо, tgdavies! Это была хорошая идея.

4. Я добавил режим сна(1) перед вызовом future.get, а затем основной поток не используется для выполнения задач. Похоже, что если вы сделаете блокирующий будущий вызов до того, как рабочий поток FJ увидит задачу, вызывающий поток выполнит ее. Имеет смысл, учитывая, что в противном случае основной поток был бы заблокирован, и это позволяет избежать дополнительного переключения контекста.

5. Да, в этом есть смысл. Однако я был удивлен, когда обнаружил это. Во-первых, я подумал, что сделал что-то не так в своей программе. Вот почему я спрашиваю здесь. Ожидали бы вы такого поведения, @boneill? Интересно, что этого не происходит с завершенными будущими: Если вы замените ForkJoinPool.commonPool().submit() на CompletableFuture.supplyAsync() и ForkJoinPool.getCommonPoolParallelism() > 1, вы никогда не будете наблюдать такое поведение.

Ответ №1:

TL;DR:
Это не JIT-компилятор, а код самого класса ForkJoinPoolTask, который «оптимизирует» особую ситуацию вызова get() в будущем, задача которого еще не запущена.

Я начал отладку кода и заметил следующее:
ForkJoinPool.commonPool().submit() создает задачу ForkJoinPool, которая реализует Future.
Когда вызывается get() для возвращенного будущего, он сначала проверяет статус в tryExternalHelp (), который вызывается externalInterruptibleAwaitDone().
Если tryExternalHelp() замечает, что задача еще не запущена, сумев «отключить» (т. Е. Удалить) задачу из правой рабочей очереди, она выполняет задачу в вызывающем потоке, в котором был вызван get (), который является основным потоком в моем примере. Затем tryExternalHelp() возвращает завершено (или не удалось) статус externalInterruptibleAwaitDone().
Если tryExternalHelp() извещения задача была затеяна еще не в состоянии убрать задачи из прав работника-очереди, он просто возвращает текущий статус externalInterruptibleAwaitDone (), который ждет, пока задача не завершена (или не удалось).
Тогда результат — если он существует — в любом случае возвращается get ().
CompletableFuture ведет себя не так.
он всегда выполняет задачу в рабочем потоке и никогда в вызывающем потоке.

Следовательно, это зависит от того, насколько мало времени между вызовом submit() в пуле forkJoin и get() в будущем, используется ли вызывающий поток или рабочий поток пула forkJoin для выполнения задачи. И это также зависит от того, насколько заполнены рабочие очереди.