#apache-spark #pyspark #streaming #pandas-groupby
#apache-spark #pyspark #потоковая передача #pandas-groupby
Вопрос:
Я новичок в потоковой передаче Spark и Pandas UDF. Я работаю над pyspark consumer от kafka, полезная нагрузка имеет формат xml и пытается проанализировать входящий xml, применяя pandas udf
@pandas_udf("col1 string, col2 string",PandasUDFType.GROUPED_MAP)
def test_udf(df):
import xmltodict
from collections import MutableMapping
xml_str=df.iloc[0,0]
df_col=['col1', 'col2']
doc=xmltodict.parse(xml_str,dict_constructor=dict)
extract_needed_fields = { k:doc[k] for k in df_col }
return pd.DataFrame( [{'col1': 'abc', 'col2': 'def'}] , index=[0] , dtype="string" )
data=df.selectExpr("CAST(value AS STRING) AS value")
data.groupby("value").apply(test_udf).writeStream.format("console").start()
Я получаю следующую ошибку
File "pyarrow/array.pxi", line 859, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 215, in pyarrow.lib.array
File "pyarrow/array.pxi", line 104, in pyarrow.lib._handle_arrow_array_protocol
ValueError: Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol.
Правильный ли это подход? Что я делаю не так
Комментарии:
1. похоже на ошибку в spark: issues.apache.org/jira/browse/SPARK-31920
Ответ №1:
Похоже, что это скорее недокументированное ограничение, чем ошибка. Вы не можете использовать какой-либо тип pandas, который будет сохранен как объект массива, у которого есть метод с именем __arrow_array__
, потому что pyspark всегда определяет маску. string
Тип, который вы использовали, хранится в stringArray, что является таким случаем. После того, как я преобразовал строку dtype в object, ошибка исчезла.
Ответ №2:
При преобразовании фрейма данных pandas в pyspark я также наткнулся на эту ошибку :
Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol
В моем фрейме данных pandas были значения, подобные datetime, которые я пытался преобразовать в «строку». Изначально я использовал astype("string")
метод, который выглядел следующим образом :
df["time"] = (df["datetime"].dt.time).astype("string")
Когда я попытался получить информацию об этом фрейме данных, казалось, что он действительно был преобразован в строковый тип :
df.info(verbose=True)
> ...
> # Column Non-Null Count Dtype
> ...
> 6 time 295452 non-null string
Но ошибка продолжала возвращаться ко мне.
Решение
Чтобы избежать этого, я вместо этого продолжил использовать apply(str)
метод :
df["time"] = (df["datetime"].dt.time).apply(str)
Что дало мне тип object
df.info(verbose=True)
> ...
> # Column Non-Null Count Dtype
> ...
> 6 time 295452 non-null object
После этого преобразование прошло успешно
spark.createDataFrame(df)
# DataFrame[datetime: string, date: string, year: bigint, month: bigint, day: bigint, day_name: string, time: string, hour: bigint, minute: bigint]