#java #multithreading #java-8 #completable-future
#java #многопоточность #java-8 #завершаемая-будущее
Вопрос:
Я тестирую, как CompletableFuture
работает. Меня интересует, как выполнять задачи параллельно:
try {
CompletableFuture one = CompletableFuture.runAsync(() -> {
throw new RuntimeException("error");
});
CompletableFuture two = CompletableFuture.runAsync(() -> System.out.println("2"));
CompletableFuture three = CompletableFuture.runAsync(() -> System.out.println("3"));
CompletableFuture all = CompletableFuture.allOf(one, two, three);
all.get();
} catch (InterruptedException e) {
System.out.println(e);
} catch (ExecutionException e) {
System.out.println(e);
}
В этом случае они будут выполнены все.
1. Возможно ли прервать все запущенные потоки, когда в одном из них возникает исключение?
2. Когда этот код находится внутри метода класса, который может быть вызван из разных потоков, будет ли он потокобезопасным?
Ответ №1:
1.Is возможно ли прервать все запущенные потоки, когда в одном из них возникает исключение?
Да, это возможно. Все потоки должны иметь доступ к общему объекту, состояние которого может быть изменено и прочитано другими потоками. Это может быть, например AtomicInteger
. Смотрите пример ниже:
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class Dates {
public static void main(String[] args) throws Exception {
try {
AtomicInteger excCounter = new AtomicInteger(0);
CompletableFuture one = CompletableFuture.runAsync(new ExcRunnable(excCounter));
CompletableFuture two = CompletableFuture.runAsync(new PrintRunnable("2", excCounter));
CompletableFuture three = CompletableFuture.runAsync(new PrintRunnable("3", excCounter));
CompletableFuture all = CompletableFuture.allOf(one, two, three);
all.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
}
}
class ExcRunnable implements Runnable {
private final AtomicInteger excCounter;
public ExcRunnable(AtomicInteger excCounter) {
this.excCounter = excCounter;
}
@Override
public void run() {
Random random = new Random();
int millis = (int) (random.nextDouble() * 5000);
System.out.println("Wait " millis);
Threads.sleep(450);
// Inform another threads that exc occurred
excCounter.incrementAndGet();
throw new RuntimeException("error");
}
}
class PrintRunnable implements Runnable {
private final String name;
private final AtomicInteger excCounter;
public PrintRunnable(String name, AtomicInteger excCounter) {
this.name = name;
this.excCounter = excCounter;
}
@Override
public void run() {
int counter = 10;
while (counter-- > 0 amp;amp; excCounter.get() == 0) {
System.out.println(name);
Threads.sleep(450);
}
}
}
class Threads {
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
У нас есть 3 задачи: две, которые печатают свое имя, и одна, которая выдает исключение через некоторое время. Перед созданием исключения счетчик увеличивается, чтобы сообщить другим задачам, что одна из них завершилась неудачей, и они должны завершить выполнение. Задания печати проверяют этот счетчик и в случае, если условие не выполнено, они завершают свою работу. Когда вы комментируете excCounter.incrementAndGet();
строку, другие задачи завершают свою работу, не зная, что одна из них вызвала исключение.
- Когда этот код находится внутри метода класса, который может быть вызван из разных потоков, будет ли он потокобезопасным?
Взгляните на определение потокобезопасности. Например, предположим, что задачи печати увеличивают общий счетчик с каждой напечатанной строкой. Если счетчик является примитивным int
, это не обеспечивает потокобезопасность, поскольку значение счетчика может быть заменено. Но если вы используете AtomicInteger
это потокобезопасность, потому что AtomicInteger
это потокобезопасность.
Комментарии:
1. Приятно, это будет работать и с ExecutorService, спасибо!
Ответ №2:
1
если какая-либо из ваших асинхронных задач вызовет исключение, all.get() выдаст исключение. Это означает, что вы можете отменить все CF в предложении catch. Но ваши асинхронные задачи должны быть удобны для прерываний, т. Е. периодически проверять наличие флага прерывания или обрабатывать исключение InterruptedException и возвращаться досрочно.
Отмена задачи всегда должна обрабатываться с использованием механизма прерывания
2
Все упомянутые вами ссылочные переменные являются локальными, поэтому нет необходимости беспокоиться о безопасности потоков. Локальные переменные всегда потокобезопасны.