#dataframe #apache-spark #filter #pyspark
#фрейм данных #apache-spark #Фильтр #pyspark
Вопрос:
Я пытаюсь запустить цикл for в PySpark, которому требуется a для фильтрации переменной для алгоритма.
Вот пример моего фрейма данных df_prods:
---------- -------------------- --------------------
|ID | NAME | TYPE |
---------- -------------------- --------------------
| 7983 |SNEAKERS 01 | Sneakers|
| 7034 |SHIRT 13 | Shirt|
| 3360 |SHORTS 15 | Short|
Я хочу выполнить итерацию по списку идентификаторов, получить совпадение из алгоритма, а затем отфильтровать тип продукта.
Я создал функцию, которая получает тип:
def get_type(ID_PROD):
return [row[0] for row in df_prods.filter(df_prods.ID == ID_PROD).select("TYPE").collect()]
И хотел, чтобы она возвращала:
print(get_type(7983))
Sneakers
Но я нахожу две проблемы:
1- для этого требуется много времени (дольше, чем я делал аналогичную вещь на Python)
2- Он возвращает тип массива строк: [‘Sneakers’], и когда я пытаюсь отфильтровать продукты, это происходит:
type = get_type(7983)
df_prods.filter(df_prods.type == type)
java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [Sneakers]
Кто-нибудь знает лучший способ подойти к этому в PySpark?
Заранее большое вам спасибо. Мне очень сложно изучать PySpark.
Комментарии:
1. вместо этого просто соберите .. сделайте collect().head
Ответ №1:
Небольшая корректировка вашей функции. Это возвращает фактическую строку целевого столбца из первой записи, найденной после фильтрации.
from pyspark.sql.functions import col
def get_type(ID_PROD):
return df.filter(col("ID") == ID_PROD).select("TYPE").collect()[0]["TYPE"]
type = get_type(7983)
df_prods.filter(col("TYPE") == type) # works
Я считаю, что использование col("colname")
намного более читабельно.
О проблеме производительности, о которой вы упомянули, я действительно не могу сказать без дополнительной информации (например, проверки данных и остальной части вашего приложения). Попробуйте этот синтаксис и скажите мне, улучшится ли производительность.
Комментарии:
1. Это сработало, спасибо! Для фильтрации требуется 9 секунд, что дольше, чем я получал с Pandas. Я понимаю, что это связано с тем, что фрейм данных находится в памяти в Pandas, а не в Pyspark, но мне не хватает знаний в Pyspark, чтобы сделать это быстрее.