#join #apache-flink
#Присоединиться #apache-flink
Вопрос:
Я оцениваю Apache Flink для потоковой обработки в качестве замены / дополнения Apache Spark. Одна из задач, которую мы обычно решаем с помощью Spark, — это обогащение данных.
Т.е. у меня есть поток данных от датчиков Интернета вещей с идентификатором датчика, и у меня есть набор метаданных датчиков. Я хочу преобразовать входной поток в поток измерения датчика метаданных датчика.
В Spark я могу присоединиться к DStream с помощью RDD.
case calss SensorValue(sensorId: Long, ...)
case class SensorMetadata(sensorId: Long, ...)
val sensorInput: DStream[SensorValue] = readEventsFromKafka()
val staticMetadata: RDD[(Long, SensorMetadata)] =
spark.read.json(...).as[SensorMetadata]
.map {s => (s.sensorId, s)}.rdd
val joined: DStream[(SensorValue, SensorMetadata)] =
sensorInput.map{s => (s.sensorId, s)}.transform { rdd: RDD[SensorValue] =>
rdd.join(staticMetadata)
.map { case (_, (s, m)) => (s, m) } // Get rid of nested tuple
}
Могу ли я проделать тот же трюк с Apache Flink? Я не вижу прямого API для этого. Единственная идея, которая у меня есть, — использовать преобразование с отслеживанием состояния — я могу объединить метаданные и события датчиков в один поток и использовать хранилище состояний Flink для хранения метаданных (псевдокод):
val sensorInput: DataStream[SensorValue] = readEventsFromKafka()
val statisMetadata: DataStream[SensorMetadata] = readMetadataFromJson()
val result: DataStream[(SensorValue, SensorMetadata)] =
sensorInput.keyBy("sensorId")
.connect(staticMetadata.keyBy("sensorId"))
.flatMap {new RichCoFlatMapFunction() {
private val ValueState<SensorMetadata> md = _;
override def open = ??? // initiate value state
def flatMap1(s: SensorEvent, s: Collector(SensorEvent, SensorMetadata)) =
collector.collect(s, md.value)
def flatMap2(s: SensorMetadata, s: Collector[(SensorEvent, SensorMetadata)]) =
md.update(s)
}}
Это правильный подход? Могу ли я использовать в большем масштабе, когда метаданные не помещаются на одной машине?
Спасибо
Ответ №1:
Использование a CoFlatMapFunction
для соединения является распространенным подходом. Однако у него есть один существенный недостаток. Функция вызывается всякий раз, когда поступает кортеж из любого ввода, и вы не можете контролировать, какой ввод использовать первым. Итак, вначале вам придется обрабатывать события датчика, когда метаданные не были полностью прочитаны. Один из подходов заключается в буферизации всех событий одного входа до тех пор, пока не будет использован другой вход. С другой стороны, преимущество этого CoFlatMapFunction
подхода заключается в том, что вы можете динамически обновлять метаданные. В вашем примере кода оба ввода вводятся с помощью клавиши join . Это означает, что входные данные разделены, и каждый taskslot обрабатывает другой набор ключей. Следовательно, ваши метаданные могут быть больше, чем может обрабатывать компьютер (если вы настраиваете серверную часть состояния RocksDB, состояние может сохраняться на диске, поэтому вы даже не привязаны к размеру памяти).
Если вам требуется, чтобы все метаданные присутствовали при запуске задания, и если метаданные являются статическими (они не меняются) и достаточно малы, чтобы поместиться на одной машине, вы также можете использовать обычный FlatMapFunction
и загружать метаданные в open()
метод из файла. В отличие от вашего подхода, это было бы широковещательное объединение, где каждый taskslot имеет полные метаданные в памяти. Помимо того, что все метаданные доступны при использовании данных события, преимущество этого подхода заключается в том, что вам не нужно перетасовывать данные события, поскольку они могут быть объединены на любом компьютере.
Комментарии:
1. Спасибо за ваш ответ. Единственное, что осталось для меня неясным — как я могу «буферизировать» ввод? Не могли бы вы дать мне несколько ключевых слов для поиска?
2. Вы можете создать второй объект состояния с ключом, в который вы добавляете записи событий, пока не поступит соответствующее событие метаданных. Когда это происходит, вы объединяете и генерируете все события, очищаете состояние события и вставляете событие метаданных в состояние метаданных.