#stream #akka #apache-flink
#поток #akka #apache-flink
Вопрос:
Я хочу заменить одну часть моего потокового процессора на основе Akka Streams на Flink. Возможно ли в настоящее время использовать потоки Akka в качестве источника для Flink, а затем Flink в качестве источника для потоков Akka в одной и той же кодовой базе?
Текущий поток с потоками Akka выглядит следующим образом:
// Kafka Source -> Read Flow involving Azure cosmosDB -> Evaluate Flow -> Write Flow -> Logger Flow -> Sink
lazy private val converterGraph: Graph[ClosedShape.type, Future[Done]] =
GraphDSL.create(sink) { implicit builder => out =>
source.watchTermination()(Keep.none) ~> prepareFlow ~> readFlow ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out
ClosedShape
}
Потоки выше определены следующим образом:
def prepareFlow: Flow[FromSource, ToRead, NotUsed]
def readFlow: Flow[ToRead, ToEvaluate, NotUsed]
Теперь вместо readFlow
того, чтобы быть потоком Akka, я хотел бы заменить его потоковым процессором Flink. Таким образом, вывод prepareFlow
будет входом для Flink на основе readFlow
, а вывод этого будет вводом для evaluateFlow
.
В принципе, возможно ли сделать что-то подобное:
prepareFlow ~> [Flink source ->read -> result] ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out
Я вижу, что в Apache Bahir есть разъем Flink Akka (приемник), но не уверен, что это можно использовать только с актерами Akka или также с потоками.
Ответ №1:
Вы можете обернуть свое prepareFlow
чтение из CosmosDB как пользовательское flink Source
(путем расширения SourceFunction
) и обернуть весь поток evaluate-write-logger как пользовательский SinkFunction
.
Поскольку сам Flink распределен, вы будете интегрировать akka-stream в задание Flink, но не наоборот. Основные проблемы, которые я вижу при таком подходе, заключаются в том, что у akka-stream было обратное давление из коробки, но сам Flink в основном блокируется. Например, метод SourceFunction.run() требует наличия внутреннего бесконечного цикла, создающего сообщения на каждой итерации, поэтому вам нужно заблокировать его, чтобы дождаться, пока akka-stream создаст там следующее сообщение.
Комментарии:
1. Спасибо за ответ.
2. В моем случае использования
prepareFlow
выполняется некоторая подготовка, но поиск CosmosDB будет выполнен в Flink, и это будет добавлено к локальному состоянию в Flink. Наша основная мотивация при рассмотрении Flink заключается в использовании данных локального состояния, чтобы при последующих чтениях не нужно было обращаться к CosmosDB до тех пор, пока это не потребуется. Будет ли AsyncI / O полезен в нашем случае? Как вы упомянули, данные будут поступать в виде потока из akka stream, и Flink необходимо будет вызвать CosmosDB для заполнения для каждого входящего события потока.3. Дополнительный вопрос относительно вашего предложения написать пользовательскую функцию SinkFunction, которая сочетает в себе evaluate write logger. Будет ли проблематично сохранить только оценку в SinkFunction, а остальные продолжить как потоки? Еще раз спасибо!