Поток Akka: в чем разница между распаковкой и трансляцией?

#scala #stream #akka #akka-stream

#scala #поток #akka #akka-stream

Вопрос:

Я пытаюсь добиться чего-то подобного:

График

Я пытаюсь создать этот поток, используя Flow.fromGraph

  • Я могу сделать, join используя Zip[B, C] который принимает 2 потока
  • Я могу сделать split двумя способами:
    • использование Broadcast[A](2)
    • использование UnZip[(A,A)] , которому предшествует .map(a -> (a, a))

Оба map(f1) и map(f2) являются пользовательскими потоками, которые я получаю из включенных библиотек, поэтому я не могу их изменить, поэтому, пожалуйста, не говорите .map(a => (f1(a), f2(a)))

В чем различия между двумя случаями или они эквивалентны?Единственное, что я нашел отличающимся, это Broadcast возможность отмены только тогда, когда все нижестоящие потоки отменяют ( eagerCancel = false ), что является его поведением по умолчанию, в отличие от UnZip (который делает то, что делает broadcast с eagerCancel = true )

Кроме того, что происходит в случае сбоев в любом из двух путей? т. Е. каково влияние, если для определенного элемента f1 выдает ошибку?

Кроме того, скажем, если у нас нет f2 функции (то есть нет операции сопоставления) и мы хотим выдать (b,a) в конце, следует ли заменить f2 потоком идентификации или можно пропустить все вместе? (если последнее, вы когда-нибудь использовали бы поток идентификации?)

 val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)

split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?
  

Возможно, это помогает с внутренними буферами / обратным давлением?

Комментарии:

1. возможно, здесь помогает FlowWithContext, но на данный момент он все еще находится в разработке и недостаточно готов для обработки этого … также не так много документов / примеров FlowWithContext

2. Аналогично здесь: gist.github.com/davideicardi/d3b383e5945a44252931582f83ecadc2

Ответ №1:

Они оба являются операторами разветвления; однако

Unzip из документов:

Принимает поток из двух кортежей элементов и разархивирует два элемента в два разных нижестоящих потока.

В то время как Broadcast

Каждый входящий элемент выдает каждый из n выходных данных.

Следовательно, мы можем заключить, что Unzip — это просто трансляция с n = 2 ; но важно, что если элементы представляют собой кортеж, трансляция просто создаст n выходные данные того же кортежа. Распаковка создаст 2 выходных данных, каждый для элемента A и B