Параллелизм Java, при каком условии CompletableFuture.supplyAsync() вернет значение null

#java #multithreading #java-8 #concurrency #nullpointerexception

#java #многопоточность #java-8 #параллелизм #исключение nullpointerexception

Вопрос:

Обнаружена проблема в производственной среде, связанная CompletableFuture.supplyAsync() с тем, что у нас есть метод пакетной обработки, как показано ниже:

 import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class CompletableFutureProblem {
    public void batchOperation(){
        List<String> stringList = new ArrayList<>();
        stringList.add("task1");
        stringList.add("task2");
        List<CompletableFuture<String>> futures = new ArrayList<>();
        stringList.parallelStream().forEach(str -> {
            CompletableFuture<String> response = restApiCall(str);
            futures.add(response);
        });
        //futures.add(null);
        CompletableFuture<Void> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        CompletableFuture<List<String>> convertedResult = result.thenApply(v ->
            futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
        );
        try {
            List<String> finishedTask = convertedResult.get();
            System.out.println(finishedTask.toString());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    public CompletableFuture<String> restApiCall(String str){
        return CompletableFuture.supplyAsync(() -> {
            return "Complete-"   str;
        });
    }

    public static void main(String[] args) {
        CompletableFutureProblem problem = new CompletableFutureProblem();
        problem.batchOperation();
    }
}
 

Когда все будет работать нормально, будет выведено:
[Завершить-task2, Завершить-task1]

Однако иногда он выдает исключение, как показано ниже, в рабочей среде:

 Exception in thread "main" java.lang.NullPointerException
    at java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1320)
    at java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2238)
    at third.concurrent.CompletableFutureProblem.batchOperation(CompletableFutureProblem.java:20)
    at third.concurrent.CompletableFutureProblem.main(CompletableFutureProblem.java:40)
 

Я исследовал CompletableFuture.allOf() исходный код и обнаружил, что если список futures содержит значение null, например, futures.add(null) , исключение будет выдано, но я действительно не знаю, при каких сценариях метод CompletableFuture.supplyAsync() in restApiCall вернет null ?

Спасибо вашему пациенту за чтение длинного поста.

Ответ №1:

futures записывается несколькими потоками, поскольку вы используете stringList параллельный поток. Однако futures это ArrayList , который не является потокобезопасным.

Поэтому вы не можете быть уверены, что каждый элемент, добавленный к нему из другого потока, будет виден без надлежащей синхронизации. Когда вы преобразуете его в массив, возникнут проблемы с видимостью памяти, которые являются неопределенными, поэтому иногда это работает так, как ожидалось.

Чтобы устранить эту проблему, обычно используется параллельная коллекция. Однако в этом случае не имеет смысла распараллеливать CompletableFuture.supplyAsync() , поскольку это неблокирующий вызов. Поэтому лучшим решением является просто перебирать список:

 stringList.forEach(str -> {
 

Кроме того, предварительно выделенный массив в toArray() должен быть пустым:

 futures.toArray(new CompletableFuture[0])
 

Комментарии:

1. Могу я спросить, как мы можем воспроизвести это исключение и убедиться, что это вызвано проблемой видимости потока.

2. @LiJing while (true) problem.batchOperation(); сгенерирует исключение через некоторое время. После исправления он будет продолжать работать бесконечно.