#apache-spark #pyspark #spark-structured-streaming
Вопрос:
Я использую структурированную потоковую передачу Spark для чтения событий из Azure Eventhub, обогащаю события некоторыми данными, выполняя статическое объединение потоков, и выводю обогащенные события в какой-либо другой источник.
Проблема в том, что через несколько часов потоковое задание перестает обнаруживать обновленные данные в статической таблице. Поэтому обратите внимание, что обновления в статической таблице сначала перехватываются и обновляются, но по прошествии неизвестного количества времени это, похоже, больше не так. Статический фрейм данных представляет собой дельта — таблицу, хранящуюся в dbfs, которая может обновляться каждые 30 минут.
Кто-нибудь знает, почему сначала он работает так, как ожидалось (т. Е. Обрабатываются изменения в статической дельта-таблице), но через некоторое время он перестает обнаруживать изменения в статической таблице?
Пожалуйста, ознакомьтесь с моим упрощенным кодом
# Read eventhub
metricbeat_df = spark
.readStream
.format("eventhubs")
.options(**eh_conf)
.load()
# Join event stream with facts
joined_df = join_with_facts(metricbeat_df)
trigger = {"processingTime": "30 seconds"}
for_each_stream = joined_df
.writeStream
.trigger(**trigger)
.foreach(CustomParser()) # This just outputs each event to the desired output
for_each_stream.start()
Где join_with_facts
выглядит так:
def join_with_facts(dataframe):
facts_df = spark.table("my_db.facts_table")
joined_facts = dataframe
.join(facts_df, "ID", how='inner')
return joined_facts