Pyspark как отфильтровать фрейм данных внутри функции карты RDD?

#python #pyspark

Вопрос:

Я пытаюсь запросить фрейм данных внутри функции обратного вызова карты и создать новый столбец с вычислением на основе нескольких строк и столбцов.

DF выглядит так

datum начало uhrzeit quittierung
2021-01-01 XYZ 08:00:00 2021-01-01 09:00:00
2021-01-01 XYZ 07:05:00 2021-01-01 09:05:00
2021-01-01 XYZ 07:05:00 2021-01-01 09:05:00
2021-01-01 XYZ 10:00:00 2021-01-01 10:05:00
2021-01-01 XYZ 10:00:00 2021-01-01 11:00:00
 df = SPARK_DATAFRAME

def func1(x):
    count = df.filter(df.datum == x.datum).filter(df.start == x.start).filter(df.uhrzeit < x.uhrzeit).filter(df.quittierung[12:19] > x.uhrzeit).count()
    return (x.datum, x.start, count)



rdd2=df.rdd.map(lambda x: (func1(x))) 
df2=rdd2.toDF(["datum", "start", "count"])
df2.show()
 

Сделав это, я получу следующее сообщение об ошибке:

Ошибка выбора: Не удалось сериализовать объект: Ошибка типа: не удается рассолить _thread.Объекты RLock

Кто-нибудь может мне помочь, как я могу заархивировать расширение исходного кадра данных по некоторым столбцам, где значение основано на запросе по всему кадру данных?

Пример Результата

datum начало uhrzeit quittierung считать
2021-01-01 XYZ 08:00:00 2021-01-01 09:00:00 2
2021-01-01 XYZ 07:05:00 2021-01-01 09:05:00 0
2021-01-01 XYZ 07:06:00 2021-01-01 09:05:00 1
2021-01-01 XYZ 10:00:00 2021-01-01 10:04:00 0
2021-01-01 XYZ 10:05:00 2021-01-01 11:00:00 0

Ответ №1:

Похоже, вам просто нужно df.groupby("datum", "start").count().show() , разве это не даст вам результат, который вы ищете? Или вам действительно нужно использовать rdd.map ?

Причина ошибки в том, что spark пытается сериализовать функцию, func1 однако внутри у func1 вас есть исходный кадр данных, который не сериализуется.

Комментарии:

1. Для простоты я удалил часть конкатенации фильтрации. Этим я думаю, что упустил важную деталь. На самом деле мне нужно не только фильтровать для равных значений, где groupby сделал бы трюк, но и проводить сравнения. Вы знаете, как это сделать без RDD? Я попробовал обойти панд, но расчет занимает слишком много времени.

2. Не могли бы вы привести пример результирующего кадра данных?

3. Я отредактировал оригинальное сообщение

Ответ №2:

Вы могли бы попробовать с группой и Панд UDF, как показано ниже:

 df = SPARK_DATAFRAME

@pandas_udf(df.schema, functionType=PandasUDFType.GROUPED_MAP)
def func1(pdf):
    count = pdf.loc["same filtering but in Pandas"].count()
    return (count)


Result= df.groupBy(['datum', 'start']).apply(func1)
 

Что-то в этом направлении должно сработать