#scala #stream #akka #akka-stream
#scala #поток #akka #akka-stream
Вопрос:
Я пытаюсь добиться чего-то подобного:
Я пытаюсь создать этот поток, используя Flow.fromGraph
- Я могу сделать,
join
используяZip[B, C]
который принимает 2 потока - Я могу сделать
split
двумя способами:- использование
Broadcast[A](2)
- использование
UnZip[(A,A)]
, которому предшествует.map(a -> (a, a))
- использование
Оба map(f1)
и map(f2)
являются пользовательскими потоками, которые я получаю из включенных библиотек, поэтому я не могу их изменить, поэтому, пожалуйста, не говорите .map(a => (f1(a), f2(a)))
В чем различия между двумя случаями или они эквивалентны?Единственное, что я нашел отличающимся, это Broadcast
возможность отмены только тогда, когда все нижестоящие потоки отменяют ( eagerCancel = false
), что является его поведением по умолчанию, в отличие от UnZip
(который делает то, что делает broadcast с eagerCancel = true
)
Кроме того, что происходит в случае сбоев в любом из двух путей? т. Е. каково влияние, если для определенного элемента f1 выдает ошибку?
Кроме того, скажем, если у нас нет f2
функции (то есть нет операции сопоставления) и мы хотим выдать (b,a)
в конце, следует ли заменить f2 потоком идентификации или можно пропустить все вместе? (если последнее, вы когда-нибудь использовали бы поток идентификации?)
val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)
split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?
Возможно, это помогает с внутренними буферами / обратным давлением?
Комментарии:
1. возможно, здесь помогает FlowWithContext, но на данный момент он все еще находится в разработке и недостаточно готов для обработки этого … также не так много документов / примеров FlowWithContext
2. Аналогично здесь: gist.github.com/davideicardi/d3b383e5945a44252931582f83ecadc2
Ответ №1:
Они оба являются операторами разветвления; однако
Unzip
из документов:
Принимает поток из двух кортежей элементов и разархивирует два элемента в два разных нижестоящих потока.
В то время как Broadcast
Каждый входящий элемент выдает каждый из n выходных данных.
Следовательно, мы можем заключить, что Unzip — это просто трансляция с n = 2
; но важно, что если элементы представляют собой кортеж, трансляция просто создаст n
выходные данные того же кортежа. Распаковка создаст 2 выходных данных, каждый для элемента A
и B