#java #java-stream #forkjoinpool
#java #java-поток #forkjoinpool
Вопрос:
Я хочу использовать parallel stream api для чтения redis, получения всех элементов и случайной выборки 10% из всех элементов. Но я обнаружил, что иногда он выдает исключение, говорящее random.nextInt(itemCount)
, что не принимает 0 в качестве входных данных. Это меня очень смущает.
Является ли это многопоточной проблемой? Я не знаю, что делать.
Вот этот код:
private Timer timer = new Timer(); // I start the job in a timer.
private Random rand = new Random();
private List<String> ids = getIds(); // get redis keys.
private ForkJoinPool updateThreadsPool = new ForkJoinPool(32);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
ForkJoinTask<List<List<Item>>> asyncTask = updateThreadsPool.submit(() -> {
return ids.parallelStream().map(id -> getFromRedis(id)).collect(Collectors.toList());
});
int totalNum = 0;
List<Item> totalItems = new ArrayList<>();
try {
totalNum = asyncTask.get().stream().peek(totalItems::addAll)
.reduce(0, (num, list) -> num list.size(), Integer::sum);
} catch (Exception e) { // do something }
int randIndex = rand.nextInt(totalNum); // this is where it throws exception.
// says that totalNum is zero.
// I'm pretty sure totalItems is not empty, because I print log for that.
// I tried use int totalNum = totalItems.size(), but I got the same problem.
}
}, 1000L, 1000L
);
Комментарии:
1. Не
asyncTask.get()
следует возвращать aList<Item>
? Ужасный код после него, а именно.stream().peek(totalItems::addAll) .reduce(0, (num, list) -> num list.size(), Integer::sum)
, кажется, пытается объединить поток списков, но еслиItem
это не подтипList<Item>
, потока списков нет, и этот код даже не прошел бы через компилятор. УчитываяForkJoinTask<List<Item>> asyncTask…
, что вы можете просто использоватьList<Item> totalItems = asyncTask.join(); int totalNum = totalItems.size();
, и он должен соответствовать размеруids
.2. @Holger вы правы, извините, я использовал lombok для объявления, и я допустил ошибку здесь… Это должен быть список <Список<Элемент>> .
3. @Holger и второй: int totalNum = totalItems.size(); Я попробовал это, иногда он возвращает ноль, как в приведенном выше коде.
4. Затем используйте
List<Item> totalItems = asyncTask.join().stream() .flatMap(List::stream) .collect(Collectors.toList()); int totalNum = totalItems.size();
Ключевым моментом является то, что используйте API по назначению и не используйтеcatch (Exception e) { }
, поскольку в противном случаеpeek
это могло вызвать побочные эффекты (например, создание непустого списка), в то время как неудачная операция (уменьшение до числа) не дала результата, поэтому вы остаетесь с предварительно инициализированным значением, т. Е.int totalNum = 0;
передtry
блоком. Когда вы собираете aList
, нет смысла в операции избыточного подсчета, поскольку aList
имеетsize()
…