GroupBy с ApplyInPandas в PySpark — как правильно реализовать UDF?

#apache-spark #pyspark #apache-spark-sql

#apache-spark #pyspark #apache-spark-sql

Вопрос:

Я пытаюсь использовать PandasUDF в PySpark, чтобы найти «самый длинный уникальный хвост» в иерархии.

Например, если мой ввод является:

 1.2 1.2.3  

тогда самый длинный хвост — «1.2.3»

У меня также может быть несколько уникальных наборов, например:

 1.2 1.2.3 5.6.7 5.6  

в этом случае вывод должен быть:

 1.2.3 5.6.7  

Подход, который я использую, заключается в следующем:

  • отсортируйте входные данные так, чтобы были перечислены похожие строки, чтобы, если предыдущая строка «содержится» в следующей строке, я мог отфильтровать ее и вернуть только самые длинные уникальные строки.

пример ввода:

 1.2.3 5.6.7 5.6 1.2  

сортировка становится:

 1.2 1.2.3 5.6 5.6.7  

когда я фильтрую строку в строке, мой вывод должен быть

 1.2.3 5.6.7  

Я испробовал два подхода. Во-первых, нужно написать функцию, которая выполняет цикл через DF, отправленный в нее следующим образом:

 def getLongestTail(key, pdf) -gt; pd.DataFrame:  sortedData = pdf.sort_values(by='value')  for i in range(len(sortedData)-1):  if sortedData.index(i 1).loc['value'].startswith(sortedData.loc['value']):  sortedData.index(i 1) = False  return pd.DataFrame(sortedData)  

Во-вторых, использовать встроенную лямбда-функцию

 def getLongestTail(pdf) -gt; pd.DataFrame:  pdf = pdf.sort  return (lambda x: pdf.shift(1).loc['value'].startswith(pdf.loc['value']))  

I have also tried to decorate as follows:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)

Here is my overall code:

 import pyspark from pyspark.sql import SparkSession from pyspark.sql.functions import col  from pyspark.sql.functions import pandas_udf, PandasUDFType import pandas as pd from pyspark.sql.types import *  simpleData = [  ('A', '1.2.3'),  ('A', '1.2'),  ('B', '9.8'),  ('A', '5.6.7.8'),  ('B', '9'),  ('B', '9.8.7'),  ('A', '5')]   schema = StructType([  StructField("letter", StringType()),  StructField("value", StringType()) ])  def getLongestTail(pdf) -gt; pd.DataFrame:  pdf = pdf.sort  return pd.DataFrame((lambda x: pdf.loc['value'].startswith(pdf.shift(1).loc['value'])))   spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame(data=simpleData, schema = schema)  df_result = df.groupby('letter').applyInPandas(getLongestTail, schema=schema).show()  

The errors being shown in my Jupyter notebook are showing worker crashed and errors relating to Py4JJavaError. a I am sure there is something basic I am missing — any comments appreciated.

Thank you.

===

error:

 --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) /tmp/ipykernel_34305/1009949605.py in lt;modulegt;  3 # df_grouped.show()  4  ----gt; 5 df_result = df.groupby('letter').applyInPandas(getLongestTailL, schema=schema).show()  6    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)  325 if answer[1] == REFERENCE_TYPE: --gt; 326 raise Py4JJavaError(  327 "An error occurred while calling {0}{1}{2}.n".  328 format(target_id, ".", name), value)