Как присоединиться к фреймам данных Pyspark на основе групп

#python #apache-spark #join #pyspark

Вопрос:

У меня есть 2 кадра данных Pyspark

Фрейм данных 1 — df где расположены столбцы customer_id, address_id, order_id, date the order was placed, order_share

  ---- ---- -------- ---------- ----------- 
|c_id|a_id|order_id|order_date|order_share|
 ---- ---- -------- ---------- ----------- 
|  c1|  a1|       1|2021-01-23|        0.5|
|  c1|  a2|       1|2021-01-23|        0.2|
|  c1|  a3|       1|2021-01-23|        0.3|
|  c2|  a5|       2|2021-03-20|        0.4|
|  c2|  a6|       2|2021-03-20|        0.6|
|  c1|  a1|       3|2021-02-20|        0.3|
|  c1|  a2|       3|2021-02-20|        0.3|
|  c1|  a3|       3|2021-02-20|        0.4|
 ---- ---- -------- ---------- ----------- 
 

Фрейм данных 2 — df_address где расположены столбцы customer_id, address_id, the date of address creation

  ---- ---- ------------ 
|c_id|a_id|created_date|
 ---- ---- ------------ 
|  c1|  a1|  2020-12-31|
|  c1|  a2|  2020-04-23|
|  c1|  a3|  2020-03-23|
|  c1|  a4|  2020-01-16|
|  c2|  a5|  2020-12-28|
|  c2|  a6|  2020-05-16|
|  c2|  a7|  2020-03-04|
 ---- ---- ------------ 
 

Теперь я хочу объединить обе эти таблицы таким образом, чтобы для каждого идентификатора заказа я получал адрес, df_address и соответствующая запись должна быть 0.0 в order_share столбце

Мой вывод должен выглядеть так

  ---- ---- ------------ -------- ---------- ----------- 
|c_id|a_id|created_date|order_id|order_date|order_share|
 ---- ---- ------------ -------- ---------- ----------- 
|  c1|  a1|  2020-12-31|       1|2021-01-23|        0.5|
|  c1|  a2|  2020-04-23|       1|2021-01-23|        0.2|
|  c1|  a3|  2020-03-23|       1|2021-01-23|        0.3|
|  c1|  a4|  2020-01-16|       1|2021-01-23|        0.0|
|  c2|  a5|  2020-12-28|       2|2021-03-20|        0.4|
|  c2|  a6|  2020-05-16|       2|2021-03-20|        0.6|
|  c2|  a7|  2020-03-04|       2|2021-03-20|        0.0|
|  c1|  a1|  2020-12-31|       3|2021-02-20|        0.3|
|  c1|  a2|  2020-04-23|       3|2021-02-20|        0.3|
|  c1|  a3|  2020-03-23|       3|2021-02-20|        0.4|
|  c1|  a4|  2020-01-16|       3|2021-02-20|        0.0|
 ---- ---- ------------ -------- ---------- ----------- 
 

Это не похоже на обычное соединение слева/справа, и я должен делать это для каждого идентификатора заказа.

Я попытался присоединиться с помощью ['c_id','a_id'] , но результат не близок к ожидаемому. Рассматривая df_address как слева, df так и справа, использование left join дает мне нулевые значения для order_id и right join не дает мне всех адресов из df_address

Похоже, мне нужно применить какой-то groupby для каждого order_id, а затем применить объединение для каждой группы, но я не знаю, как это реализовать или даже уверен, что это правильный способ сделать это

Любая помощь будет признательна. Спасибо!

Комментарии:

1. что мешает вам left присоединиться c_id и a_id

2. Пожалуйста, проверьте правку вопроса, я изложил проблему, с которой столкнулся при использовании левого соединения

Ответ №1:

Вы можете использовать orders промежуточный фрейм данных, созданный из df фрейма данных и содержащий только информацию о заказах, которые являются столбцами customer_id , order_id и order_date . Затем вы сначала внутренне df_address соединяете фрейм данных с этим orders фреймом данных, чтобы связать каждую пару (customer_id, address_id) с информацией о конкретных заказах, а затем слева соединяете результирующий df фрейм данных с фреймом данных, чтобы получить order_share каждый адрес, а затем заменяете null значение в order_share столбце на 0.0 .

Вот полный код:

 from pyspark.sql import functions as F

# Orders dataframe that contains only orders-specific information
orders = df.select('customer_id', 'order_id', 'order_date').distinct()

df_address.join(orders, ['customer_id'])  # link addresses with orders
  .join(df.drop('order_date'), ['customer_id', 'address_id', 'order_id'], 'left_outer')  # link orders/addresses with order shares
  .withColumn('order_share', F.when(F.col('order_share').isNotNull(), F.col('order_share')).otherwise(F.lit(0.0)))  # replace null in order_share column with 0.0
  .orderBy('customer_id', 'order_id', 'address_id')  # optional, to reorder dataframe
 

Подробные сведения

Примечание: Я переупорядочил все фреймы данных здесь order_id и address_id для удобства чтения

Начиная с df фрейма данных в вашем вопросе, мы получаем следующий фрейм данных orders :

  ----------- -------- ---------- 
|customer_id|order_id|order_date|
 ----------- -------- ---------- 
|c1         |1       |2021-01-23|
|c2         |2       |2021-03-20|
|c1         |3       |2021-02-20|
 ----------- -------- ---------- 
 

Затем мы соединяем этот фрейм orders данных с фреймом данных df_address :

  ----------- ---------- ------------ -------- ---------- 
|customer_id|address_id|created_date|order_id|order_date|
 ----------- ---------- ------------ -------- ---------- 
|c1         |a1        |2020-12-31  |1       |2021-01-23|
|c1         |a2        |2020-04-23  |1       |2021-01-23|
|c1         |a3        |2020-03-23  |1       |2021-01-23|
|c1         |a4        |2020-01-16  |1       |2021-01-23|
|c2         |a5        |2020-12-28  |2       |2021-03-20|
|c2         |a6        |2020-05-16  |2       |2021-03-20|
|c2         |a7        |2020-03-04  |2       |2021-03-20|
|c1         |a1        |2020-12-31  |3       |2021-02-20|
|c1         |a2        |2020-04-23  |3       |2021-02-20|
|c1         |a3        |2020-03-23  |3       |2021-02-20|
|c1         |a4        |2020-01-16  |3       |2021-02-20|
 ----------- ---------- ------------ -------- ---------- 
 

И с последним соединением с df фреймом данных без столбца order_date мы получаем:

  ----------- ---------- -------- ------------ ---------- ----------- 
|customer_id|address_id|order_id|created_date|order_date|order_share|
 ----------- ---------- -------- ------------ ---------- ----------- 
|c1         |a1        |1       |2020-12-31  |2021-01-23|0.5        |
|c1         |a2        |1       |2020-04-23  |2021-01-23|0.2        |
|c1         |a3        |1       |2020-03-23  |2021-01-23|0.3        |
|c1         |a4        |1       |2020-01-16  |2021-01-23|null       |
|c2         |a5        |2       |2020-12-28  |2021-03-20|0.4        |
|c2         |a6        |2       |2020-05-16  |2021-03-20|0.6        |
|c2         |a7        |2       |2020-03-04  |2021-03-20|null       |
|c1         |a1        |3       |2020-12-31  |2021-02-20|0.3        |
|c1         |a2        |3       |2020-04-23  |2021-02-20|0.3        |
|c1         |a3        |3       |2020-03-23  |2021-02-20|0.4        |
|c1         |a4        |3       |2020-01-16  |2021-02-20|null       |
 ----------- ---------- -------- ------------ ---------- ----------- 
 

Затем нам просто нужно заменить null на 0.0 , и мы получим наш ожидаемый фрейм данных:

  ----------- ---------- -------- ------------ ---------- ----------- 
|customer_id|address_id|order_id|created_date|order_date|order_share|
 ----------- ---------- -------- ------------ ---------- ----------- 
|         c1|        a1|       1|  2020-12-31|2021-01-23|        0.5|
|         c1|        a2|       1|  2020-04-23|2021-01-23|        0.2|
|         c1|        a3|       1|  2020-03-23|2021-01-23|        0.3|
|         c1|        a4|       1|  2020-01-16|2021-01-23|        0.0|
|         c2|        a5|       2|  2020-12-28|2021-03-20|        0.4|
|         c2|        a6|       2|  2020-05-16|2021-03-20|        0.6|
|         c2|        a7|       2|  2020-03-04|2021-03-20|        0.0|
|         c1|        a1|       3|  2020-12-31|2021-02-20|        0.3|
|         c1|        a2|       3|  2020-04-23|2021-02-20|        0.3|
|         c1|        a3|       3|  2020-03-23|2021-02-20|        0.4|
|         c1|        a4|       3|  2020-01-16|2021-02-20|        0.0|
 ----------- ---------- -------- ------------ ---------- ----------- 
 

Комментарии:

1. Спасибо. Всего несколько уточнений, я не думаю distinct , что это действительно необходимо при создании orders , так как строка всегда будет уникальной? А также, используя left_outer или left подразумевая одно и то же при присоединении, верно?

2. О бесполезном distinct , строки только с customer_id , order_id и order_date не будут уникальными, потому что мы удалили столбцы address_id и order_share . В вашем df примере фрейма данных, если вы удалите distinct orders его , в фрейме данных вы получите три раза (c1, 1, 2021-01-23) подряд, два раза (c2, 2, 2021-03-20) подряд и три раза (c1, 3, 2021-02-20) подряд. Так что тебе придется остаться distinct . И да, left_outer и left означает то же самое при соединении двух фреймов данных.

3. Извини, я виноват. У меня было на уме кое-что другое. Я понимаю, в чем польза distinct . Я просто хочу уточнить, мы используем distinct взаимозаменяемо с dropDuplicates правом?

4. Да, здесь вы можете использовать distinct и dropDuplicates без аргументов взаимозаменяемо.

Ответ №2:

Я попробовал full outer объединить с DataFrames , чтобы получить недостающие c_id и a_id комбинации и дальнейшее использование, когда с isNull для Null значений столбцов, поступающих из df и заменяющих, когда их значения df_address снизу являются результатами —

Подготовка Данных

 input_str1 = """
c1|  a1|       1|2021-01-23|        0.5|
c1|  a2|       1|2021-01-23|        0.2|
c1|  a3|       1|2021-01-23|        0.3|
c2|  a5|       2|2021-03-20|        0.4|
c2|  a6|       2|2021-03-20|        0.6|
c1|  a1|       3|2021-02-20|        0.3|
c1|  a2|       3|2021-02-20|        0.3|
c1|  a3|       3|2021-02-20|        0.4
""".split("|")

input_values1 = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str1))

cols1 = list(map(lambda x: x.strip() if x.strip() != '' else None, "c_id|a_id|order_id|order_date|order_share".split("|")))
            
n = len(input_values1)
n_col1 = 5

input_list1 = [tuple(input_values1[i:i n_col1]) for i in range(0,n,n_col1)]

sparkDF1 = sql.createDataFrame(input_list1, cols1)

sparkDF1.show()

 ---- ---- -------- ---------- ----------- 
|c_id|a_id|order_id|order_date|order_share|
 ---- ---- -------- ---------- ----------- 
|  c1|  a1|       1|2021-01-23|        0.5|
|  c1|  a2|       1|2021-01-23|        0.2|
|  c1|  a3|       1|2021-01-23|        0.3|
|  c2|  a5|       2|2021-03-20|        0.4|
|  c2|  a6|       2|2021-03-20|        0.6|
|  c1|  a1|       3|2021-02-20|        0.3|
|  c1|  a2|       3|2021-02-20|        0.3|
|  c1|  a3|       3|2021-02-20|        0.4|
 ---- ---- -------- ---------- ----------- 

input_str2 = """
c1|  a1|  2020-12-31|
c1|  a2|  2020-04-23|
c1|  a3|  2020-03-23|
c1|  a4|  2020-01-16|
c2|  a5|  2020-12-28|
c2|  a6|  2020-05-16|
c2|  a7|  2020-03-04
""".split("|")

input_values2 = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str2))

cols2 = list(map(lambda x: x.strip() if x.strip() != '' else None, "c_id|a_id|created_date".split("|")))
            
n = len(input_values2)
n_col2 = 3

input_list2 = [tuple(input_values2[i:i n_col2]) for i in range(0,n,n_col2)]

sparkDF2 = sql.createDataFrame(input_list2, cols2)

sparkDF2.show()

 ---- ---- ------------ 
|c_id|a_id|created_date|
 ---- ---- ------------ 
|  c1|  a1|  2020-12-31|
|  c1|  a2|  2020-04-23|
|  c1|  a3|  2020-03-23|
|  c1|  a4|  2020-01-16|
|  c2|  a5|  2020-12-28|
|  c2|  a6|  2020-05-16|
|  c2|  a7|  2020-03-04|
 ---- ---- ------------ 
 

Полное присоединение

Переименование значений столбцов , поступающих из SparkDF2, которые в дальнейшем будут использоваться для заполнения нулевых значений, чтобы избежать неоднозначных имен столбцов

 finalDF = sparkDF1.join(sparkDF2
                       , (sparkDF1['c_id'] == sparkDF2['c_id'])
                        amp; (sparkDF1['a_id'] == sparkDF2['a_id'])
                        ,'full'
                ).select(sparkDF1['*']
                         ,sparkDF2['c_id'].alias('c_id_address')
                         ,sparkDF2['a_id'].alias('a_id_address')
                         ,sparkDF2['created_date']
                        )
finalDF.show()

 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 
|c_id|a_id|order_id|order_date|order_share|c_id_address|a_id_address|created_date|
 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 
|  c1|  a3|       1|2021-01-23|        0.3|          c1|          a3|  2020-03-23|
|  c1|  a3|       3|2021-02-20|        0.4|          c1|          a3|  2020-03-23|
|  c2|  a5|       2|2021-03-20|        0.4|          c2|          a5|  2020-12-28|
|null|null|    null|      null|       null|          c2|          a7|  2020-03-04|
|  c1|  a2|       1|2021-01-23|        0.2|          c1|          a2|  2020-04-23|
|  c1|  a2|       3|2021-02-20|        0.3|          c1|          a2|  2020-04-23|
|  c1|  a1|       1|2021-01-23|        0.5|          c1|          a1|  2020-12-31|
|  c1|  a1|       3|2021-02-20|        0.3|          c1|          a1|  2020-12-31|
|null|null|    null|      null|       null|          c1|          a4|  2020-01-16|
|  c2|  a6|       2|2021-03-20|        0.6|          c2|          a6|  2020-05-16|
 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 
 

Когда это все

 finalDF = finalDF.withColumn('c_id',F.when(F.col('c_id').isNull()
                                           ,F.col('c_id_address')).otherwise(F.col('c_id'))
                            )
                    .withColumn('a_id',F.when(F.col('a_id').isNull()
                                              ,F.col('a_id_address')).otherwise(F.col('a_id'))
                            )
                    .withColumn('order_share',F.when(F.col('order_share').isNull()
                                                     ,0.0).otherwise(F.col('order_share'))
                            )


finalDF.show()

 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 
|c_id|a_id|order_id|order_date|order_share|c_id_address|a_id_address|created_date|
 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 
|  c1|  a3|       1|2021-01-23|        0.3|          c1|          a3|  2020-03-23|
|  c1|  a3|       3|2021-02-20|        0.4|          c1|          a3|  2020-03-23|
|  c2|  a5|       2|2021-03-20|        0.4|          c2|          a5|  2020-12-28|
|  c2|  a7|    null|      null|        0.0|          c2|          a7|  2020-03-04|
|  c1|  a2|       1|2021-01-23|        0.2|          c1|          a2|  2020-04-23|
|  c1|  a2|       3|2021-02-20|        0.3|          c1|          a2|  2020-04-23|
|  c1|  a1|       1|2021-01-23|        0.5|          c1|          a1|  2020-12-31|
|  c1|  a1|       3|2021-02-20|        0.3|          c1|          a1|  2020-12-31|
|  c1|  a4|    null|      null|        0.0|          c1|          a4|  2020-01-16|
|  c2|  a6|       2|2021-03-20|        0.6|          c2|          a6|  2020-05-16|
 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 

 

Примечание — order_id и order_date равны нулю, так как для c_id a_id комбинации и в sparkDF2

Этот пример представляет собой подход , направленный на получение требуемого решения , которое вы можете дополнительно импровизировать, если потребуется, чтобы заполнить недостающие значения порядка