#pyspark #apache-kafka #spark-structured-streaming
#пыспарк #апач-кафка #спарк-структурированный-потоковый
Вопрос:
Мое приложение объединяет данные, поступающие от нескольких датчиков, в 5-минутные окна. Данные записываются в Кафку, и приложение pyspark считывает их из Кафки. Приложение делит потоковые данные на 5 минут, затем группирует их по идентификатору устройства и применяет к каждой группе пользовательскую функцию агрегирования (с помощью pandas udf). Я использую структурированный потоковый API и python. Мой код выглядит следующим образом:
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType from pyspark.sql.functions import col, struct, from_json, to_json, window, pandas_udf, PandasUDFType, from_unixtime, to_utc_timestamp from pyspark.sql import functions as F import pandas as pd userSchema = StructType().add("key", "string").add("value", "string") sensor_schema = StructType([ StructField("device_uuid", StringType()), StructField("ts", StringType()), StructField("value", DoubleType()) ]) output_schema = StructType([ StructField("sensor_id", StringType()), StructField("window_id", StringType(), True), StructField("aggregated_value", DoubleType(), True), ]) @pandas_udf(output_schema, PandasUDFType.GROUPED_MAP) def my_aggregation(df): ... df_result = pd.DataFrame(data={...}, columns=output_schema.fieldNames()) return df_result if __name__ == '__main__': print("creating Spark session...") spark = (SparkSession .builder .appName("My App") .getOrCreate() ) spark.conf.set("spark.sql.session.timeZone", "UTC") print("Spark session created!") print("getting data from Kafka...") df = (spark .readStream .format("kafka").option("kafka.bootstrap.servers", "...") .option("subscribe", "...") .option("max.poll.interval.ms", "3600000") .option("max.poll.records", "300") .option("startingOffsets", "latest") .option("failOnDataLoss", "false") .load()) df1 = (df.select("value").withColumn("value", df["value"].cast(StringType()))) df_value = (df1.withColumn("value", from_json("value", sensor_schema)).select(col('value.*'))) df_value = df_value.withColumn("ts", df_value.ts.cast(LongType())) df_date = df_value.withColumn("timestamp", to_utc_timestamp(from_unixtime(col("ts") / 1000, 'yyyy-MM-dd HH:mm:ss'), 'UTC')) df_windowed = (df_date .withWatermark("timestamp", "0 minutes") .groupBy("device_uuid", window("timestamp", "5 minutes")) .agg( F.collect_list("value").alias("samples"), F.collect_list("ts").cast("arraylt;longgt;").alias("epochs"), ) ) try: df_windowedX = (df_windowed.groupBy("device_uuid").apply(my_aggregation)) except Exception as e: print("MY EXCEPTION: ") print(e) query_pred = (df_windowedX.select(to_json(struct([col(c) for c in df_windowedX.columns])).alias('value')) .writeStream .format("kafka") .option("kafka.bootstrap.servers", "...") .option("topic", "...") .option("checkpointLocation", "...") .option("failOnDataLoss", "false") .outputMode("append") .start()) query_pred.awaitTermination()
Моя проблема в том, что каждое окно не обрабатывается до тех пор, пока не появится первый образец следующего окна. Есть ли какой-либо способ принудительно обработать окно до того, как появится образец следующего окна?
Я приведу пример.
Допустим, наши окна имеют следующие границы: [11:00, 11:05), [11:05, 11:10), [11:10, 11:15), … Допустим, образец a (первый) имеет метку времени 11:03:50, образец b имеет метку времени 11:04:50, а затем образец c имеет метку времени 11:08:50. Для простоты предположим, что каждый образец считывается из Кафки в момент их метки времени (поэтому образец a считывается в 11:03:50 и т. Д.). Первое окно содержит a и b, но они не обрабатываются (т. Е. Передаются группе по функции), пока не появится c. Как заставить алгоритм обрабатывать данные, относящиеся к первому окну, когда часы пробьют 11:05:00?