Java ThreadPoolExecutor

#java #multithreading #runnable

#java #многопоточность #работоспособный

Вопрос:

У меня большие проблемы с пониманием Java ThreadPoolExecutor. Например, я хочу вычислить квадраты чисел 1-1000:

 public static void main(String[] args) throws InterruptedException, ExecutionException {

    Callable<ArrayList<Integer>> c = new squareCalculator(1000);
    ExecutorService executor = Executors.newFixedThreadPool(5);
    Future<ArrayList<Integer>> result = executor.submit(c);


    for(Integer i: result.get()){
        System.out.println(i);
    }

}
  

И

 public class squareCalculator implements Callable<ArrayList<Integer>>{
    private int i;
    private int max;

    private int threadID;
    private static int id;
    private ArrayList<Integer> squares;

    public squareCalculator(int max){
        this.max = max;
        this.i = 1;
        this.threadID = id;
        id  ;
        squares = new ArrayList<Integer>();

    }

    public ArrayList<Integer> call() throws Exception {
        while(i <= max){            
            squares.add(i*i);
            System.out.println("Proccessed number "  i   " in thread " this.threadID);
            Thread.sleep(1);
            i  ;
        }
        return squares;

    }
}
  

Теперь моя проблема в том, что я получаю только один поток, выполняющий вычисления. Я ожидал получить 5 потоков.

Ответ №1:

Если вы хотите, чтобы Callable программа выполнялась 5 раз одновременно, вам нужно submit выполнить это 5 раз. Вы отправляли его только один раз, а затем запрашивали результат 5 раз.

Javadoc из submit() :

Отправляет задачу, возвращающую значение, на выполнение и возвращает будущее, представляющее ожидающие результаты задачи. get Метод Future вернет результат задачи после успешного завершения.

Вы видите, что Javadoc для submit() использует единственное число для обозначения «задачи», а не «задачи».

Исправить это просто: отправьте его несколько раз:

 Future<ArrayList<Integer>> result1 = executor.submit(c);
Future<ArrayList<Integer>> result2 = executor.submit(c);
Future<ArrayList<Integer>> result3 = executor.submit(c);
/// etc..

result1.get();
result2.get();
result3.get();
// etc..
  

Комментарии:

1. Спасибо тебе, Эрвин. Возможно, я неправильно понимаю некоторые очень простые вещи. Если я сделаю это таким образом, все вычисление квадратных чисел 1-1000 будет выполнено 3 раза (одновременно), и в итоге у меня будет 3 списка массивов с номерами 1, 4, 9, 16 и т.д. Что я на самом деле хочу, так это разделить всю эту вычислительную задачу на n потоков, чтобы получить один список массивов, но быстрее. Но, вероятно, это не может быть сделано так просто?

Ответ №2:

ExecutorService Будет использоваться один поток для выполнения каждой Callable отправляемой вами задачи. Поэтому, если вы хотите, чтобы несколько потоков вычисляли квадраты, вам нужно отправить несколько задач, например, по одной задаче для каждого числа. Затем вы получите Future<Integer> от каждой задачи, которую вы можете сохранить в списке и вызывать get() для каждой из них, чтобы получить результаты.

 public class SquareCalculator implements Callable<Integer> {
    private final int i;

    public SquareCalculator(int i) {
        this.i = i;
    }

    @Override
    public Integer call() throws Exception {
        System.out.println("Processing number "   i   " in thread "   Thread.currentThread().getName());
        return i * i;
    }

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Future<Integer>> futures = new ArrayList<>();

        // Create a Callable for each number, submit it to the ExecutorService and store the Future
        for (int i = 1; i <= 1000; i  ) {
            Callable<Integer> c = new SquareCalculator(i);
            Future<Integer> future = executor.submit(c);
            futures.add(future);
        }

        // Wait for the result of each Future
        for (Future<Integer> future : futures) {
            System.out.println(future.get());
        }

        executor.shutdown();
    }
}
  

Затем результат выглядит примерно так:

 Processing number 2 in thread pool-1-thread-2
Processing number 1 in thread pool-1-thread-1
Processing number 6 in thread pool-1-thread-1
Processing number 7 in thread pool-1-thread-2
Processing number 8 in thread pool-1-thread-2
Processing number 9 in thread pool-1-thread-2
...
1
4
9
...
  

Ответ №3:

Это забавная проблема, которую приходится решать параллельно, потому что создание результирующего массива (или списка) выполняется за O (n) раз, поскольку при создании он инициализируется нулями.

 public static void main(String[] args) throws InterruptedException {
    final int chunks = Runtime.getRuntime().availableProcessors();
    final int max = 1001;

    ExecutorService executor = Executors.newFixedThreadPool(chunks);

    final List<ArrayList<Long>> results = new ArrayList<>(chunks);

    for (int i = 0; i < chunks; i  ) {
        final int start = i * max / chunks;
        final int end = (i   1) * max / chunks;
        final ArrayList<Long> localResults = new ArrayList<>(0);
        results.add(localResults);

        executor.submit(new Runnable() {
            @Override
            public void run() {
                // Reallocate enough space locally so it's done in parallel.
                localResults.ensureCapacity(end - start);
                for (int j = start; j < end; j  ) {
                    localResults.add((long)j * (long)j);
                }
            }
        });
    }

    executor.shutdown();
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MICROSECONDS);

    int i = 0;
    for (List<Long> list : results) {
        for (Long l : list) {
            System.out.printf("%d: %dn", i, l);
              i;
        }
    }

}
  

Накладные расходы, связанные с классами-оболочками, приведут к снижению производительности, поэтому вам следует использовать что-то вроде Fastutil. Затем вы могли бы объединить их с помощью чего-то вроде Iterables от Guava.concat, только в версии списка, совместимой с длинным списком Fastutil.

Из этого также может получиться хорошая ForkJoinTask, но опять же, вам понадобятся эффективные логические (сопоставление, а не копирование; обратное List.sublist) функции конкатенации списков для реализации ускорения.