#python #pyspark
Вопрос:
Я пытаюсь запросить фрейм данных внутри функции обратного вызова карты и создать новый столбец с вычислением на основе нескольких строк и столбцов.
DF выглядит так
datum | начало | uhrzeit | quittierung |
---|---|---|---|
2021-01-01 | XYZ | 08:00:00 | 2021-01-01 09:00:00 |
2021-01-01 | XYZ | 07:05:00 | 2021-01-01 09:05:00 |
2021-01-01 | XYZ | 07:05:00 | 2021-01-01 09:05:00 |
2021-01-01 | XYZ | 10:00:00 | 2021-01-01 10:05:00 |
2021-01-01 | XYZ | 10:00:00 | 2021-01-01 11:00:00 |
df = SPARK_DATAFRAME
def func1(x):
count = df.filter(df.datum == x.datum).filter(df.start == x.start).filter(df.uhrzeit < x.uhrzeit).filter(df.quittierung[12:19] > x.uhrzeit).count()
return (x.datum, x.start, count)
rdd2=df.rdd.map(lambda x: (func1(x)))
df2=rdd2.toDF(["datum", "start", "count"])
df2.show()
Сделав это, я получу следующее сообщение об ошибке:
Ошибка выбора: Не удалось сериализовать объект: Ошибка типа: не удается рассолить _thread.Объекты RLock
Кто-нибудь может мне помочь, как я могу заархивировать расширение исходного кадра данных по некоторым столбцам, где значение основано на запросе по всему кадру данных?
Пример Результата
datum | начало | uhrzeit | quittierung | считать |
---|---|---|---|---|
2021-01-01 | XYZ | 08:00:00 | 2021-01-01 09:00:00 | 2 |
2021-01-01 | XYZ | 07:05:00 | 2021-01-01 09:05:00 | 0 |
2021-01-01 | XYZ | 07:06:00 | 2021-01-01 09:05:00 | 1 |
2021-01-01 | XYZ | 10:00:00 | 2021-01-01 10:04:00 | 0 |
2021-01-01 | XYZ | 10:05:00 | 2021-01-01 11:00:00 | 0 |
Ответ №1:
Похоже, вам просто нужно df.groupby("datum", "start").count().show()
, разве это не даст вам результат, который вы ищете? Или вам действительно нужно использовать rdd.map
?
Причина ошибки в том, что spark пытается сериализовать функцию, func1
однако внутри у func1
вас есть исходный кадр данных, который не сериализуется.
Комментарии:
1. Для простоты я удалил часть конкатенации фильтрации. Этим я думаю, что упустил важную деталь. На самом деле мне нужно не только фильтровать для равных значений, где groupby сделал бы трюк, но и проводить сравнения. Вы знаете, как это сделать без RDD? Я попробовал обойти панд, но расчет занимает слишком много времени.
2. Не могли бы вы привести пример результирующего кадра данных?
3. Я отредактировал оригинальное сообщение
Ответ №2:
Вы могли бы попробовать с группой и Панд UDF, как показано ниже:
df = SPARK_DATAFRAME
@pandas_udf(df.schema, functionType=PandasUDFType.GROUPED_MAP)
def func1(pdf):
count = pdf.loc["same filtering but in Pandas"].count()
return (count)
Result= df.groupBy(['datum', 'start']).apply(func1)
Что-то в этом направлении должно сработать