После использования forkjoinpool с параллельным потоком код после FutureTask столкнулся с многопоточной проблемой

#java #java-stream #forkjoinpool

#java #java-поток #forkjoinpool

Вопрос:

Я хочу использовать parallel stream api для чтения redis, получения всех элементов и случайной выборки 10% из всех элементов. Но я обнаружил, что иногда он выдает исключение, говорящее random.nextInt(itemCount) , что не принимает 0 в качестве входных данных. Это меня очень смущает.

Является ли это многопоточной проблемой? Я не знаю, что делать.

Вот этот код:

 private Timer timer = new Timer(); // I start the job in a timer.
private Random rand = new Random();
private List<String> ids = getIds(); // get redis keys.
private ForkJoinPool updateThreadsPool = new ForkJoinPool(32);
timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                
                ForkJoinTask<List<List<Item>>> asyncTask = updateThreadsPool.submit(() -> {
                      return ids.parallelStream().map(id -> getFromRedis(id)).collect(Collectors.toList());
                });

                int totalNum = 0;
                List<Item> totalItems = new ArrayList<>();
                try {
                    totalNum = asyncTask.get().stream().peek(totalItems::addAll)
                         .reduce(0, (num, list) -> num   list.size(), Integer::sum);
                } catch (Exception e) { // do something }
                

                int randIndex = rand.nextInt(totalNum); // this is where it throws exception.
                // says that totalNum is zero. 
                // I'm pretty sure totalItems is not empty, because I print log for that.
                // I tried use int totalNum = totalItems.size(), but I got the same problem.
            }
        }, 1000L, 1000L
);
 

Комментарии:

1. Не asyncTask.get() следует возвращать a List<Item> ? Ужасный код после него, а именно .stream().peek(totalItems::addAll) .reduce(0, (num, list) -> num list.size(), Integer::sum) , кажется, пытается объединить поток списков, но если Item это не подтип List<Item> , потока списков нет, и этот код даже не прошел бы через компилятор. Учитывая ForkJoinTask<List<Item>> asyncTask… , что вы можете просто использовать List<Item> totalItems = asyncTask.join(); int totalNum = totalItems.size(); , и он должен соответствовать размеру ids .

2. @Holger вы правы, извините, я использовал lombok для объявления, и я допустил ошибку здесь… Это должен быть список <Список<Элемент>> .

3. @Holger и второй: int totalNum = totalItems.size(); Я попробовал это, иногда он возвращает ноль, как в приведенном выше коде.

4. Затем используйте List<Item> totalItems = asyncTask.join().stream() .flatMap(List::stream) .collect(Collectors.toList()); int totalNum = totalItems.size(); Ключевым моментом является то, что используйте API по назначению и не используйте catch (Exception e) { } , поскольку в противном случае peek это могло вызвать побочные эффекты (например, создание непустого списка), в то время как неудачная операция (уменьшение до числа) не дала результата, поэтому вы остаетесь с предварительно инициализированным значением, т. Е. int totalNum = 0; перед try блоком. Когда вы собираете a List , нет смысла в операции избыточного подсчета, поскольку a List имеет size()