Структурированное потоковое задание Spark: статическое соединение потока не обновляется

#apache-spark #pyspark #spark-structured-streaming

Вопрос:

Я использую структурированную потоковую передачу Spark для чтения событий из Azure Eventhub, обогащаю события некоторыми данными, выполняя статическое объединение потоков, и выводю обогащенные события в какой-либо другой источник.

Проблема в том, что через несколько часов потоковое задание перестает обнаруживать обновленные данные в статической таблице. Поэтому обратите внимание, что обновления в статической таблице сначала перехватываются и обновляются, но по прошествии неизвестного количества времени это, похоже, больше не так. Статический фрейм данных представляет собой дельта — таблицу, хранящуюся в dbfs, которая может обновляться каждые 30 минут.

Кто-нибудь знает, почему сначала он работает так, как ожидалось (т. Е. Обрабатываются изменения в статической дельта-таблице), но через некоторое время он перестает обнаруживать изменения в статической таблице?

Пожалуйста, ознакомьтесь с моим упрощенным кодом

 # Read eventhub
metricbeat_df = spark 
    .readStream 
    .format("eventhubs") 
    .options(**eh_conf) 
    .load()

# Join event stream with facts
joined_df = join_with_facts(metricbeat_df)

trigger = {"processingTime": "30 seconds"}
for_each_stream = joined_df 
    .writeStream 
    .trigger(**trigger) 
    .foreach(CustomParser()) # This just outputs each event to the desired output
for_each_stream.start()
 

Где join_with_facts выглядит так:

 def join_with_facts(dataframe):
    facts_df = spark.table("my_db.facts_table")
    joined_facts = dataframe 
        .join(facts_df, "ID", how='inner')
    return joined_facts