Можно ли использовать потоковый процессор Flink между потоками Akka в качестве источника и приемника?

#stream #akka #apache-flink

#поток #akka #apache-flink

Вопрос:

Я хочу заменить одну часть моего потокового процессора на основе Akka Streams на Flink. Возможно ли в настоящее время использовать потоки Akka в качестве источника для Flink, а затем Flink в качестве источника для потоков Akka в одной и той же кодовой базе?

Текущий поток с потоками Akka выглядит следующим образом:

  // Kafka Source -> Read Flow involving Azure cosmosDB -> Evaluate Flow -> Write Flow -> Logger Flow -> Sink
  lazy private val converterGraph: Graph[ClosedShape.type, Future[Done]] =
    GraphDSL.create(sink) { implicit builder => out =>
      source.watchTermination()(Keep.none) ~> prepareFlow ~> readFlow ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out
      ClosedShape
  }
  

Потоки выше определены следующим образом:

 def prepareFlow: Flow[FromSource, ToRead, NotUsed]

def readFlow: Flow[ToRead, ToEvaluate, NotUsed]
  

Теперь вместо readFlow того, чтобы быть потоком Akka, я хотел бы заменить его потоковым процессором Flink. Таким образом, вывод prepareFlow будет входом для Flink на основе readFlow , а вывод этого будет вводом для evaluateFlow .

В принципе, возможно ли сделать что-то подобное:

   prepareFlow ~> [Flink source ->read -> result] ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out

  

Я вижу, что в Apache Bahir есть разъем Flink Akka (приемник), но не уверен, что это можно использовать только с актерами Akka или также с потоками.

Ответ №1:

Вы можете обернуть свое prepareFlow чтение из CosmosDB как пользовательское flink Source (путем расширения SourceFunction ) и обернуть весь поток evaluate-write-logger как пользовательский SinkFunction .

Поскольку сам Flink распределен, вы будете интегрировать akka-stream в задание Flink, но не наоборот. Основные проблемы, которые я вижу при таком подходе, заключаются в том, что у akka-stream было обратное давление из коробки, но сам Flink в основном блокируется. Например, метод SourceFunction.run() требует наличия внутреннего бесконечного цикла, создающего сообщения на каждой итерации, поэтому вам нужно заблокировать его, чтобы дождаться, пока akka-stream создаст там следующее сообщение.

Комментарии:

1. Спасибо за ответ.

2. В моем случае использования prepareFlow выполняется некоторая подготовка, но поиск CosmosDB будет выполнен в Flink, и это будет добавлено к локальному состоянию в Flink. Наша основная мотивация при рассмотрении Flink заключается в использовании данных локального состояния, чтобы при последующих чтениях не нужно было обращаться к CosmosDB до тех пор, пока это не потребуется. Будет ли AsyncI / O полезен в нашем случае? Как вы упомянули, данные будут поступать в виде потока из akka stream, и Flink необходимо будет вызвать CosmosDB для заполнения для каждого входящего события потока.

3. Дополнительный вопрос относительно вашего предложения написать пользовательскую функцию SinkFunction, которая сочетает в себе evaluate write logger. Будет ли проблематично сохранить только оценку в SinkFunction, а остальные продолжить как потоки? Еще раз спасибо!