PySpark структурированная потоковая и фильтрованная обработка деталей

#pyspark #spark-streaming

#pyspark #spark-потоковая передача

Вопрос:

Я хочу оценить потоковый (несвязанный) фрейм данных в Spark 2.4:

 time          id   value
6:00:01.000   1     333
6:00:01.005   1     123
6:00:01.050   2     544
6:00:01.060   2     544
 

Когда все данные идентификатора 1 попали в dataframe и поступают данные следующего идентификатора 2, я хочу выполнить вычисления для полных данных идентификатора 1. Но как мне это сделать? Я думаю, что не могу использовать оконные функции, поскольку я не знаю заранее время, которое также варьируется для каждого идентификатора. И я также не знаю идентификатор из других источников, кроме фрейма потоковых данных.

Единственное решение, которое приходит мне в голову, содержит сравнение переменных (память) и цикл while:

 id_old = 0 # start value
while true:
  id_cur = id_from_dataframe
  if id_cur != id_old: # id has changed
      do calulation for id_cur
      id_old = id_cur
 

Но я не думаю, что это правильное решение. Можете ли вы дать мне подсказку или документацию, которые помогут мне, поскольку я не могу найти примеры или документацию.

Комментарии:

1. откуда вы знаете, что все данные для id достигли вашего фрейма данных, поскольку это неограниченный поток, это что-то последовательное, я имею в виду, что id = 2 будет поступать только после завершения id = 1 в dataframe?

2. Да, данные являются последовательными. Таким образом, не должно быть никаких новых данных для id = 1, если есть данные для id = 2 и так далее.

Ответ №1:

Я запускаю его с помощью комбинации водяных знаков и группировки:

 import pyspark.sql.functions as F

d2 = d1.withWatermark("time", "60 second") 
    .groupby('id', 
             F.window('time', "40 second")) 
    .agg(
         F.count("*").alias("count"), 
         F.min("time").alias("time_start"), 
         F.max("time").alias("time_stop"), 
         F.round(F.avg("value"),1).alias('value_avg'))
 

Большая часть документации показывает только основные сведения с группировкой только по времени, и я видел только один пример с другим параметром для группировки, поэтому я поместил туда свой «идентификатор».