Потоковая передача Pyspark с Pandas UDF

#apache-spark #pyspark #streaming #pandas-groupby

#apache-spark #pyspark #потоковая передача #pandas-groupby

Вопрос:

Я новичок в потоковой передаче Spark и Pandas UDF. Я работаю над pyspark consumer от kafka, полезная нагрузка имеет формат xml и пытается проанализировать входящий xml, применяя pandas udf

 @pandas_udf("col1 string, col2 string",PandasUDFType.GROUPED_MAP)
def test_udf(df):
    import xmltodict
    from collections import MutableMapping 
    xml_str=df.iloc[0,0]
    df_col=['col1', 'col2']
    doc=xmltodict.parse(xml_str,dict_constructor=dict)
    extract_needed_fields = { k:doc[k] for k in df_col }
    return pd.DataFrame( [{'col1': 'abc', 'col2': 'def'}] , index=[0] , dtype="string" )

data=df.selectExpr("CAST(value AS STRING) AS value") 
data.groupby("value").apply(test_udf).writeStream.format("console").start()

 

Я получаю следующую ошибку

   File "pyarrow/array.pxi", line 859, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 215, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 104, in pyarrow.lib._handle_arrow_array_protocol
ValueError: Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol.


 

Правильный ли это подход? Что я делаю не так

Комментарии:

1. похоже на ошибку в spark: issues.apache.org/jira/browse/SPARK-31920

Ответ №1:

Похоже, что это скорее недокументированное ограничение, чем ошибка. Вы не можете использовать какой-либо тип pandas, который будет сохранен как объект массива, у которого есть метод с именем __arrow_array__ , потому что pyspark всегда определяет маску. string Тип, который вы использовали, хранится в stringArray, что является таким случаем. После того, как я преобразовал строку dtype в object, ошибка исчезла.

Ответ №2:

При преобразовании фрейма данных pandas в pyspark я также наткнулся на эту ошибку :

 Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol
 

В моем фрейме данных pandas были значения, подобные datetime, которые я пытался преобразовать в «строку». Изначально я использовал astype("string") метод, который выглядел следующим образом :

 df["time"] = (df["datetime"].dt.time).astype("string")
 

Когда я попытался получить информацию об этом фрейме данных, казалось, что он действительно был преобразован в строковый тип :

 df.info(verbose=True)
> ...
>  #   Column    Non-Null Count   Dtype
> ...
>  6   time      295452 non-null  string
 

Но ошибка продолжала возвращаться ко мне.

Решение

Чтобы избежать этого, я вместо этого продолжил использовать apply(str) метод :

 df["time"] = (df["datetime"].dt.time).apply(str)
 

Что дало мне тип object

 df.info(verbose=True)
> ...
>  #   Column    Non-Null Count   Dtype
> ...
>  6   time      295452 non-null  object
 

После этого преобразование прошло успешно

 spark.createDataFrame(df)
# DataFrame[datetime: string, date: string, year: bigint, month: bigint, day: bigint, day_name: string, time: string, hour: bigint, minute: bigint]