Функция для фильтрации значений в PySpark

#dataframe #apache-spark #filter #pyspark

#фрейм данных #apache-spark #Фильтр #pyspark

Вопрос:

Я пытаюсь запустить цикл for в PySpark, которому требуется a для фильтрации переменной для алгоритма.

Вот пример моего фрейма данных df_prods:

  ---------- -------------------- -------------------- 
|ID        |        NAME        |           TYPE     |
 ---------- -------------------- -------------------- 
|    7983  |SNEAKERS 01         |            Sneakers|
|    7034  |SHIRT 13            |               Shirt|
|    3360  |SHORTS 15           |               Short|
  

Я хочу выполнить итерацию по списку идентификаторов, получить совпадение из алгоритма, а затем отфильтровать тип продукта.

Я создал функцию, которая получает тип:

 def get_type(ID_PROD):
    return [row[0] for row in df_prods.filter(df_prods.ID == ID_PROD).select("TYPE").collect()]
  

И хотел, чтобы она возвращала:

 print(get_type(7983))
Sneakers
  

Но я нахожу две проблемы:
1- для этого требуется много времени (дольше, чем я делал аналогичную вещь на Python)
2- Он возвращает тип массива строк: [‘Sneakers’], и когда я пытаюсь отфильтровать продукты, это происходит:

 type = get_type(7983)
df_prods.filter(df_prods.type == type)
java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [Sneakers]
  

Кто-нибудь знает лучший способ подойти к этому в PySpark?

Заранее большое вам спасибо. Мне очень сложно изучать PySpark.

Комментарии:

1. вместо этого просто соберите .. сделайте collect().head

Ответ №1:

Небольшая корректировка вашей функции. Это возвращает фактическую строку целевого столбца из первой записи, найденной после фильтрации.

 from pyspark.sql.functions import col

def get_type(ID_PROD):
  return df.filter(col("ID") == ID_PROD).select("TYPE").collect()[0]["TYPE"]

type = get_type(7983)
df_prods.filter(col("TYPE") == type) # works
  

Я считаю, что использование col("colname") намного более читабельно.

О проблеме производительности, о которой вы упомянули, я действительно не могу сказать без дополнительной информации (например, проверки данных и остальной части вашего приложения). Попробуйте этот синтаксис и скажите мне, улучшится ли производительность.

Комментарии:

1. Это сработало, спасибо! Для фильтрации требуется 9 секунд, что дольше, чем я получал с Pandas. Я понимаю, что это связано с тем, что фрейм данных находится в памяти в Pandas, а не в Pyspark, но мне не хватает знаний в Pyspark, чтобы сделать это быстрее.