PySpark — Структурированная потоковая передача. Как избежать ожидания данных следующего окна для обработки текущего окна?

#pyspark #apache-kafka #spark-structured-streaming

#пыспарк #апач-кафка #спарк-структурированный-потоковый

Вопрос:

Мое приложение объединяет данные, поступающие от нескольких датчиков, в 5-минутные окна. Данные записываются в Кафку, и приложение pyspark считывает их из Кафки. Приложение делит потоковые данные на 5 минут, затем группирует их по идентификатору устройства и применяет к каждой группе пользовательскую функцию агрегирования (с помощью pandas udf). Я использую структурированный потоковый API и python. Мой код выглядит следующим образом:

 from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType from pyspark.sql.functions import col, struct, from_json, to_json, window, pandas_udf, PandasUDFType, from_unixtime, to_utc_timestamp from pyspark.sql import functions as F import pandas as pd  userSchema = StructType().add("key", "string").add("value", "string")  sensor_schema = StructType([   StructField("device_uuid", StringType()),  StructField("ts", StringType()),  StructField("value", DoubleType())   ])  output_schema = StructType([   StructField("sensor_id", StringType()),  StructField("window_id", StringType(), True),  StructField("aggregated_value", DoubleType(), True),  ])  @pandas_udf(output_schema, PandasUDFType.GROUPED_MAP) def my_aggregation(df):   ...   df_result = pd.DataFrame(data={...}, columns=output_schema.fieldNames())   return df_result   if __name__ == '__main__':   print("creating Spark session...")   spark = (SparkSession  .builder  .appName("My App")  .getOrCreate()  )   spark.conf.set("spark.sql.session.timeZone", "UTC")   print("Spark session created!")   print("getting data from Kafka...")  df = (spark  .readStream  .format("kafka").option("kafka.bootstrap.servers", "...")  .option("subscribe", "...")  .option("max.poll.interval.ms", "3600000")  .option("max.poll.records", "300")  .option("startingOffsets", "latest")  .option("failOnDataLoss", "false")  .load())   df1 = (df.select("value").withColumn("value", df["value"].cast(StringType())))   df_value = (df1.withColumn("value", from_json("value", sensor_schema)).select(col('value.*')))   df_value = df_value.withColumn("ts", df_value.ts.cast(LongType()))   df_date = df_value.withColumn("timestamp", to_utc_timestamp(from_unixtime(col("ts") / 1000, 'yyyy-MM-dd HH:mm:ss'), 'UTC'))   df_windowed = (df_date  .withWatermark("timestamp", "0 minutes")  .groupBy("device_uuid", window("timestamp", "5 minutes"))  .agg(  F.collect_list("value").alias("samples"),  F.collect_list("ts").cast("arraylt;longgt;").alias("epochs"),  )  )   try:  df_windowedX = (df_windowed.groupBy("device_uuid").apply(my_aggregation))  except Exception as e:  print("MY EXCEPTION: ")  print(e)   query_pred = (df_windowedX.select(to_json(struct([col(c) for c in df_windowedX.columns])).alias('value'))  .writeStream  .format("kafka")  .option("kafka.bootstrap.servers", "...")  .option("topic", "...")  .option("checkpointLocation", "...")  .option("failOnDataLoss", "false")  .outputMode("append")  .start())   query_pred.awaitTermination()  

Моя проблема в том, что каждое окно не обрабатывается до тех пор, пока не появится первый образец следующего окна. Есть ли какой-либо способ принудительно обработать окно до того, как появится образец следующего окна?

Я приведу пример.

Допустим, наши окна имеют следующие границы: [11:00, 11:05), [11:05, 11:10), [11:10, 11:15), … Допустим, образец a (первый) имеет метку времени 11:03:50, образец b имеет метку времени 11:04:50, а затем образец c имеет метку времени 11:08:50. Для простоты предположим, что каждый образец считывается из Кафки в момент их метки времени (поэтому образец a считывается в 11:03:50 и т. Д.). Первое окно содержит a и b, но они не обрабатываются (т. Е. Передаются группе по функции), пока не появится c. Как заставить алгоритм обрабатывать данные, относящиеся к первому окну, когда часы пробьют 11:05:00?