#java #multithreading #java-8 #concurrency #nullpointerexception
#java #многопоточность #java-8 #параллелизм #исключение nullpointerexception
Вопрос:
Обнаружена проблема в производственной среде, связанная CompletableFuture.supplyAsync()
с тем, что у нас есть метод пакетной обработки, как показано ниже:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class CompletableFutureProblem {
public void batchOperation(){
List<String> stringList = new ArrayList<>();
stringList.add("task1");
stringList.add("task2");
List<CompletableFuture<String>> futures = new ArrayList<>();
stringList.parallelStream().forEach(str -> {
CompletableFuture<String> response = restApiCall(str);
futures.add(response);
});
//futures.add(null);
CompletableFuture<Void> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
CompletableFuture<List<String>> convertedResult = result.thenApply(v ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
try {
List<String> finishedTask = convertedResult.get();
System.out.println(finishedTask.toString());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
public CompletableFuture<String> restApiCall(String str){
return CompletableFuture.supplyAsync(() -> {
return "Complete-" str;
});
}
public static void main(String[] args) {
CompletableFutureProblem problem = new CompletableFutureProblem();
problem.batchOperation();
}
}
Когда все будет работать нормально, будет выведено:
[Завершить-task2, Завершить-task1]
Однако иногда он выдает исключение, как показано ниже, в рабочей среде:
Exception in thread "main" java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1320)
at java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2238)
at third.concurrent.CompletableFutureProblem.batchOperation(CompletableFutureProblem.java:20)
at third.concurrent.CompletableFutureProblem.main(CompletableFutureProblem.java:40)
Я исследовал CompletableFuture.allOf()
исходный код и обнаружил, что если список futures содержит значение null, например, futures.add(null) , исключение будет выдано, но я действительно не знаю, при каких сценариях метод CompletableFuture.supplyAsync()
in restApiCall
вернет null
?
Спасибо вашему пациенту за чтение длинного поста.
Ответ №1:
futures
записывается несколькими потоками, поскольку вы используете stringList
параллельный поток. Однако futures
это ArrayList
, который не является потокобезопасным.
Поэтому вы не можете быть уверены, что каждый элемент, добавленный к нему из другого потока, будет виден без надлежащей синхронизации. Когда вы преобразуете его в массив, возникнут проблемы с видимостью памяти, которые являются неопределенными, поэтому иногда это работает так, как ожидалось.
Чтобы устранить эту проблему, обычно используется параллельная коллекция. Однако в этом случае не имеет смысла распараллеливать CompletableFuture.supplyAsync()
, поскольку это неблокирующий вызов. Поэтому лучшим решением является просто перебирать список:
stringList.forEach(str -> {
Кроме того, предварительно выделенный массив в toArray()
должен быть пустым:
futures.toArray(new CompletableFuture[0])
Комментарии:
1. Могу я спросить, как мы можем воспроизвести это исключение и убедиться, что это вызвано проблемой видимости потока.
2. @LiJing
while (true) problem.batchOperation();
сгенерирует исключение через некоторое время. После исправления он будет продолжать работать бесконечно.