#java #stream #akka #akka-stream
Вопрос:
У меня есть поток, который создает состояние для некоторой метаинформации. Состояние содержит именно то, что мне нужно, но как я могу получить доступ к состоянию после завершения потока (см. statefulMapConcat.
source.via(someBusiness()).statefulMapConcat(elem -> {
// state
Set<Elem> state = new HashSet();
elem -> {
// change state
state.add(...);
return Collections.singletonList(elem);
}
}
Я знаю, что могу раскрыть состояние и использовать его как глобальную доступную переменную, но затем я получаю условие гонки с точки зрения параллелизма.
Параллельная хэш-карта решила бы эту проблему, но я надеюсь, что есть решение без блокировки.
Комментарии:
1. Там ничего
statefulMapConcat
нет…2. Извините, это исправлено
3. Если вам нужно состояние только после завершения потока, это похоже на вариант использования в будущем (например, a
CompletableFuture
в Java илиPromise
Future
комбинация / в Scala). В Java я подозреваю, что вы создадитеCompletableFuture
внешнюю часть потока и будете использовать технику, подобную вашему предыдущему вопросу (возможно, используяOptional
), чтобы отразить завершение восходящего потока в качестве элемента в потоке, чтобы вы знали, когда завершить в будущем.4. Я давно этого хотел. В частности, я хотел бы, чтобы существовала версия
statefulMapConcatMat[T, M, M2](materializer: Creator[M], f: Function[M, Function[Out, Iterable[T]]], combine: Function2[Mat, M, M2]): Source[T, M2]
. Это потребовалоmaterializer
бы каждой материализации, и она «разделила бы» материализованную ценность. Конечно, вам нужноM
быть потокобезопасным.5. Следует также отметить, что есть момент, когда передача управления состоянием субъекту (что открывает мир параллельных возможностей) и использование a
mapAsync
в amapConcat
было бы действительно идиоматичным Акка.