Есть ли способ присоединить результирующий поток groupby обратно к его исходному потоку в потоковой передаче со структурой kafka-spark?

#pyspark #apache-kafka #pyspark-sql #spark-structured-streaming

#pyspark #apache-kafka #pyspark-sql #spark-structured-streaming

Вопрос:

Я читаю поток из темы Kafka. Я выполняю операцию window groupBy во время события. Теперь я хочу присоединиться к этому результирующему потоку из groupBy обратно к исходному потоку.

 #I am reading from a Kafka topic. The following is a ping statement:
2019-04-15 13:32:33 | 64 bytes from google.com (X.X.X.X): icmp_seq=476 ttl=63 time=0.201ms
2019-04-15 13:32:34 | 64 bytes from google.com (X.X.X.X): icmp_seq=477 ttl=63 time=0.216ms
2019-04-15 13:32:35 | 64 bytes from google.com (X.X.X.X): icmp_seq=478 ttl=63 time=0.245ms
2019-04-15 13:32:36 | 64 bytes from google.com (X.X.X.X): icmp_seq=479 ttl=63 time=0.202ms
and so on..

root
|--key: binary
|--value: binary
|--topic: string
|--partition: integer
|--offset:long
|--timestamp:timestamp
|--timestampType:integer

#value contains the above ping statement so, I cast it as string.
words = lines.selectExpr("CAST(value AS STRING)")

#Now I split that line into columns with its values.
words = words.withColumn('string_val', F.regexp_replace(F.split(words.value, " ")[6], ":", "")) 
.withColumn('ping', F.regexp_replace(F.split(words.value, " ")[10], "time=", "").cast("double")) 
.withColumn('date', F.split(words.value, " ")[0]) 
.withColumn('time', F.regexp_replace(F.split(words.value, " ")[1], "|", ""))

words = words.withColumn('date_time', F.concat(F.col('date'), F.lit(" "), F.col('time')))
words = words.withColumn('date_format', F.col('date_time').cast('timestamp'))

#Now the schema becomes like this
root
|--value:string
|--string_val:string
|--ping:double
|--date:string
|--time:string
|--date_time:string
|--date_format:timestamp

#Now I have to perform a windowed groupBy operation with watermark
w = F.window('date_format', '30 seconds', '10 seconds')
words = words 
.withWatermark('date_format', '1 minutes') 
.groupBy(w).agg(F.mean('ping').alias('value'))

#Now my schema becomes like this
root
|--window:struct
|   |--start:timestamp
|   |--end:timestamp
|--value
  

Есть ли какой-либо способ присоединиться к этому результирующему потоку обратно к его исходному потоку?

Ответ №1:

Это можно сделать возможным с помощью «соединения потока с потоком», введенного в spark 2.3 Для любой версии, предшествующей spark 2.3, вам нужно будет сохранить агрегацию в каком-либо хранилище (в памяти или на диске) и выполнить левое внешнее соединение исходного потока с этим хранилищем, которое вы используете для хранениясостояние агрегации.