#pyspark #spark-streaming
#pyspark #spark-потоковая передача
Вопрос:
Я хочу оценить потоковый (несвязанный) фрейм данных в Spark 2.4:
time id value
6:00:01.000 1 333
6:00:01.005 1 123
6:00:01.050 2 544
6:00:01.060 2 544
Когда все данные идентификатора 1 попали в dataframe и поступают данные следующего идентификатора 2, я хочу выполнить вычисления для полных данных идентификатора 1. Но как мне это сделать? Я думаю, что не могу использовать оконные функции, поскольку я не знаю заранее время, которое также варьируется для каждого идентификатора. И я также не знаю идентификатор из других источников, кроме фрейма потоковых данных.
Единственное решение, которое приходит мне в голову, содержит сравнение переменных (память) и цикл while:
id_old = 0 # start value
while true:
id_cur = id_from_dataframe
if id_cur != id_old: # id has changed
do calulation for id_cur
id_old = id_cur
Но я не думаю, что это правильное решение. Можете ли вы дать мне подсказку или документацию, которые помогут мне, поскольку я не могу найти примеры или документацию.
Комментарии:
1. откуда вы знаете, что все данные для id достигли вашего фрейма данных, поскольку это неограниченный поток, это что-то последовательное, я имею в виду, что id = 2 будет поступать только после завершения id = 1 в dataframe?
2. Да, данные являются последовательными. Таким образом, не должно быть никаких новых данных для id = 1, если есть данные для id = 2 и так далее.
Ответ №1:
Я запускаю его с помощью комбинации водяных знаков и группировки:
import pyspark.sql.functions as F
d2 = d1.withWatermark("time", "60 second")
.groupby('id',
F.window('time', "40 second"))
.agg(
F.count("*").alias("count"),
F.min("time").alias("time_start"),
F.max("time").alias("time_stop"),
F.round(F.avg("value"),1).alias('value_avg'))
Большая часть документации показывает только основные сведения с группировкой только по времени, и я видел только один пример с другим параметром для группировки, поэтому я поместил туда свой «идентификатор».