forkJoin выполняется в 1 потоке

#java #multithreading #fork-join

#java #многопоточность #fork-соединение

Вопрос:

У меня есть следующий код, который является имитацией своего рода программы очистки сайта, которая очищает страницы / подстраницы и объединяет результат в строку с содержимым страниц.

Я использовал Runtime.getRuntime().availableProcessors() , поэтому я предположил, что он будет выполняться в нескольких потоках. Но, похоже, это не так.

 package Concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinPoolDemo {

    public static class MyTask extends RecursiveTask<String>
    {
        private String url;
        public MyTask(String url)
        {
            this.url = url;
        }

        @Override
        protected String compute() {

            System.out.println(Thread.currentThread().getName()   "/"   url);

            if(url.equals("http://google.com/b1")) {
                return "Content from /b1";
            } else if(url.equals("http://google.com/b2")) {
                return "Content from /b2";
            } else if(url.equals("http://google.com/b")) {
                List<MyTask> tasks = new ArrayList<>();
                tasks.add(new MyTask("http://google.com/b1"));
                tasks.add(new MyTask("http://google.com/b2"));
                String result = "Content from /bn";

                for(MyTask task : tasks) {
                    task.fork();
                    result  = task.join()   "n";
                }
                return resu<
            } else if(url.equals("http://google.com")) {

                List<MyTask> tasks = new ArrayList<>();
                tasks.add(new MyTask("http://google.com/a"));
                tasks.add(new MyTask("http://google.com/b"));
                tasks.add(new MyTask("http://google.com/c"));
                tasks.add(new MyTask("http://google.com/d"));
                tasks.add(new MyTask("http://google.com/e"));
                tasks.add(new MyTask("http://google.com/f"));
                tasks.add(new MyTask("http://google.com/g"));
                tasks.add(new MyTask("http://google.com/h"));
                tasks.add(new MyTask("http://google.com/i"));
                tasks.add(new MyTask("http://google.com/j"));
                String result = "Content from /n";

                for (MyTask task : tasks) {
                    task.fork();
                    result  = task.join()   "n";
                }
                return resu<
            } else {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Content from "   url;
            }
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        String result = pool.invoke(new MyTask("http://google.com"));
        System.out.println(result);
    }
}
  

Почему каждый форк выполняется в одном потоке?

 ForkJoinPool-1-worker-19/http://google.com
ForkJoinPool-1-worker-19/http://google.com/a
ForkJoinPool-1-worker-19/http://google.com/b
ForkJoinPool-1-worker-19/http://google.com/b1
ForkJoinPool-1-worker-19/http://google.com/b2
ForkJoinPool-1-worker-19/http://google.com/c
ForkJoinPool-1-worker-19/http://google.com/d
ForkJoinPool-1-worker-19/http://google.com/e
ForkJoinPool-1-worker-19/http://google.com/f
ForkJoinPool-1-worker-19/http://google.com/g
ForkJoinPool-1-worker-19/http://google.com/h
ForkJoinPool-1-worker-19/http://google.com/i
ForkJoinPool-1-worker-19/http://google.com/j
  

Ответ №1:

Вы блокируете соединение каждый раз, когда создаете новую задачу, ожидая ее завершения перед отправкой другой задачи. Вместо этого сначала запустите все задачи, а затем соберите их результаты:

 for(MyTask task : tasks) {
  task.fork();
}
        
for(MyTask task : tasks) {
  result  = task.join()   "n";
}