#apache-spark #pyspark #apache-spark-sql
#apache-spark #pyspark #apache-spark-sql
Вопрос:
Я пытаюсь использовать PandasUDF в PySpark, чтобы найти «самый длинный уникальный хвост» в иерархии.
Например, если мой ввод является:
1.2 1.2.3
тогда самый длинный хвост — «1.2.3»
У меня также может быть несколько уникальных наборов, например:
1.2 1.2.3 5.6.7 5.6
в этом случае вывод должен быть:
1.2.3 5.6.7
Подход, который я использую, заключается в следующем:
- отсортируйте входные данные так, чтобы были перечислены похожие строки, чтобы, если предыдущая строка «содержится» в следующей строке, я мог отфильтровать ее и вернуть только самые длинные уникальные строки.
пример ввода:
1.2.3 5.6.7 5.6 1.2
сортировка становится:
1.2 1.2.3 5.6 5.6.7
когда я фильтрую строку в строке, мой вывод должен быть
1.2.3 5.6.7
Я испробовал два подхода. Во-первых, нужно написать функцию, которая выполняет цикл через DF, отправленный в нее следующим образом:
def getLongestTail(key, pdf) -gt; pd.DataFrame: sortedData = pdf.sort_values(by='value') for i in range(len(sortedData)-1): if sortedData.index(i 1).loc['value'].startswith(sortedData.loc['value']): sortedData.index(i 1) = False return pd.DataFrame(sortedData)
Во-вторых, использовать встроенную лямбда-функцию
def getLongestTail(pdf) -gt; pd.DataFrame: pdf = pdf.sort return (lambda x: pdf.shift(1).loc['value'].startswith(pdf.loc['value']))
I have also tried to decorate as follows:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
Here is my overall code:
import pyspark from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.functions import pandas_udf, PandasUDFType import pandas as pd from pyspark.sql.types import * simpleData = [ ('A', '1.2.3'), ('A', '1.2'), ('B', '9.8'), ('A', '5.6.7.8'), ('B', '9'), ('B', '9.8.7'), ('A', '5')] schema = StructType([ StructField("letter", StringType()), StructField("value", StringType()) ]) def getLongestTail(pdf) -gt; pd.DataFrame: pdf = pdf.sort return pd.DataFrame((lambda x: pdf.loc['value'].startswith(pdf.shift(1).loc['value']))) spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame(data=simpleData, schema = schema) df_result = df.groupby('letter').applyInPandas(getLongestTail, schema=schema).show()
The errors being shown in my Jupyter notebook are showing worker crashed and errors relating to Py4JJavaError. a I am sure there is something basic I am missing — any comments appreciated.
Thank you.
===
error:
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) /tmp/ipykernel_34305/1009949605.py in lt;modulegt; 3 # df_grouped.show() 4 ----gt; 5 df_result = df.groupby('letter').applyInPandas(getLongestTailL, schema=schema).show() 6 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --gt; 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.n". 328 format(target_id, ".", name), value)