Завершаемое будущее Java 8 — параллельное выполнение

#java #multithreading #java-8 #completable-future

#java #многопоточность #java-8 #завершаемая-будущее

Вопрос:

Я тестирую, как CompletableFuture работает. Меня интересует, как выполнять задачи параллельно:

 try {
          CompletableFuture one = CompletableFuture.runAsync(() -> {
          throw new RuntimeException("error");
          });
          CompletableFuture two = CompletableFuture.runAsync(() -> System.out.println("2"));
          CompletableFuture three = CompletableFuture.runAsync(() -> System.out.println("3"));
          CompletableFuture all = CompletableFuture.allOf(one, two, three);
          all.get();
} catch (InterruptedException e) {
           System.out.println(e);
} catch (ExecutionException e) {
           System.out.println(e);
}
  

В этом случае они будут выполнены все.

1. Возможно ли прервать все запущенные потоки, когда в одном из них возникает исключение?

2. Когда этот код находится внутри метода класса, который может быть вызван из разных потоков, будет ли он потокобезопасным?

Ответ №1:

1.Is возможно ли прервать все запущенные потоки, когда в одном из них возникает исключение?

Да, это возможно. Все потоки должны иметь доступ к общему объекту, состояние которого может быть изменено и прочитано другими потоками. Это может быть, например AtomicInteger . Смотрите пример ниже:

 import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public class Dates {

    public static void main(String[] args) throws Exception {
        try {
            AtomicInteger excCounter = new AtomicInteger(0);
            CompletableFuture one = CompletableFuture.runAsync(new ExcRunnable(excCounter));
            CompletableFuture two = CompletableFuture.runAsync(new PrintRunnable("2", excCounter));
            CompletableFuture three = CompletableFuture.runAsync(new PrintRunnable("3", excCounter));
            CompletableFuture all = CompletableFuture.allOf(one, two, three);
            all.get();
        } catch (InterruptedException | ExecutionException e) {
            System.out.println(e);
        }
    }
}

class ExcRunnable implements Runnable {
    private final AtomicInteger excCounter;

    public ExcRunnable(AtomicInteger excCounter) {
        this.excCounter = excCounter;
    }

    @Override
    public void run() {
        Random random = new Random();
        int millis = (int) (random.nextDouble() * 5000);
        System.out.println("Wait "   millis);
        Threads.sleep(450);

        // Inform another threads that exc occurred
        excCounter.incrementAndGet();

        throw new RuntimeException("error");
    }
}

class PrintRunnable implements Runnable {

    private final String name;
    private final AtomicInteger excCounter;

    public PrintRunnable(String name, AtomicInteger excCounter) {
        this.name = name;
        this.excCounter = excCounter;
    }

    @Override
    public void run() {
        int counter = 10;
        while (counter-- > 0 amp;amp; excCounter.get() == 0) {
            System.out.println(name);
            Threads.sleep(450);
        }
    }
}

class Threads {
    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  

У нас есть 3 задачи: две, которые печатают свое имя, и одна, которая выдает исключение через некоторое время. Перед созданием исключения счетчик увеличивается, чтобы сообщить другим задачам, что одна из них завершилась неудачей, и они должны завершить выполнение. Задания печати проверяют этот счетчик и в случае, если условие не выполнено, они завершают свою работу. Когда вы комментируете excCounter.incrementAndGet(); строку, другие задачи завершают свою работу, не зная, что одна из них вызвала исключение.

  1. Когда этот код находится внутри метода класса, который может быть вызван из разных потоков, будет ли он потокобезопасным?

Взгляните на определение потокобезопасности. Например, предположим, что задачи печати увеличивают общий счетчик с каждой напечатанной строкой. Если счетчик является примитивным int , это не обеспечивает потокобезопасность, поскольку значение счетчика может быть заменено. Но если вы используете AtomicInteger это потокобезопасность, потому что AtomicInteger это потокобезопасность.

Комментарии:

1. Приятно, это будет работать и с ExecutorService, спасибо!

Ответ №2:

1

если какая-либо из ваших асинхронных задач вызовет исключение, all.get() выдаст исключение. Это означает, что вы можете отменить все CF в предложении catch. Но ваши асинхронные задачи должны быть удобны для прерываний, т. Е. периодически проверять наличие флага прерывания или обрабатывать исключение InterruptedException и возвращаться досрочно.

Отмена задачи всегда должна обрабатываться с использованием механизма прерывания

2

Все упомянутые вами ссылочные переменные являются локальными, поэтому нет необходимости беспокоиться о безопасности потоков. Локальные переменные всегда потокобезопасны.