Отфильтруйте один фрейм данных pyspark, используя записи из другого фрейма данных

#apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть два кадра данных, один из которых Category должен быть отфильтрован, а другой-от условной фильтрации, которую он выполняет.

Фрейм данных 1: Который имеет условие

 Earning    Age      House size    Family   ..10 more columns with no values(empty)

Avg        Low       Avg           <Empty>
 

Используя это dataframe_1 выше, я должен отфильтровать Category из dataframe_2 того, что удовлетворяет условию, приведенному в dataframe_1

 Category  Age      House size    Family   Earning   .... more columns
  01      High       Avg          Low      Low
  02      Low        Avg          Avg      Avg
  03      Avg        Avg          High     High
 

Здесь вывод будет Category :

02

поскольку он удовлетворяет всем условиям в dataframe_1

Я знаю процесс фильтрации в pyspark с использованием filter , но с использованием одного кадра данных, подобного этому:

dataframe_2.select("category").filter(col("Earning") == 'Avg').filter(col("Age") == 'Low').filter(col("House size") == 'Avg').show()

Мой вопрос заключается в том, как использовать df_1 для фильтрации category из df_2 заданного любого значения в df_1 состоянии «может быть фильтр», а также для обработки пустых значений. Любые зацепки были бы полезны

Комментарии:

1. вы можете собирать данные, чтобы получить значение из фрейма данных 1

2. @Виш, не могли бы вы проиллюстрировать, пожалуйста, то, что вы предлагаете здесь

Ответ №1:

Вы можете достичь этого, выполнив левое соединение.

Обработка нулевых значений зависит от вашего варианта использования. Но ниже приведены некоторые идеи о том, как с ними справиться.

  1. Удалите записи, содержащие нулевые значения.
  2. Замените значение null наиболее часто присутствующими значениями для определенного столбца

Ниже приведен пример кода, обработка нулевых значений в этом коде не реализована

 >>> df1 = spark.createDataFrame([("Avg", "Low", "Avg")], schema=["Earning", "Age", "House size"])

>>> df1.show()
 ------- --- ---------- 
|Earning|Age|House size|
 ------- --- ---------- 
|    Avg|Low|       Avg|
 ------- --- ---------- 

>>> df2 = spark.createDataFrame([
    ("01", "High", "Avg", "Low", "Low"),
    ("02", "Low", "Avg", "Avg", "Avg"),
    ("03", "Avg", "Avg", "High", "High")
], schema=["Category", "Age", "House size", "Family", "Earning"])

>>> df2.show()
 -------- ---- ---------- ------ ------- 
|Category| Age|House size|Family|Earning|
 -------- ---- ---------- ------ ------- 
|      01|High|       Avg|   Low|    Low|
|      02| Low|       Avg|   Avg|    Avg|
|      03| Avg|       Avg|  High|   High|
 -------- ---- ---------- ------ ------- 

>>> df3 = df1.join(df2, ["Earning", "Age", "House size"], "left")

>>> df3.show()
 ------- --- ---------- -------- ------                                         
|Earning|Age|House size|Category|Family|
 ------- --- ---------- -------- ------ 
|    Avg|Low|       Avg|      02|   Avg|
 ------- --- ---------- -------- ------ 

>>> list(df3.select("Category").toPandas()["Category"])
['02']
 

Комментарии:

1. Это не кажется идеальным способом. Как я уже сказал df1 , может быть динамическим, и фильтр может быть по любому столбцу, поэтому нельзя указывать фиксированные имена столбцов join . Также второй момент заключается в том, чтобы перейти на pandas ли с перспективой использования перспективы распараллеливания