#python #apache-spark #join #pyspark
Вопрос:
У меня есть 2 кадра данных Pyspark
Фрейм данных 1 — df
где расположены столбцы customer_id, address_id, order_id, date the order was placed, order_share
---- ---- -------- ---------- -----------
|c_id|a_id|order_id|order_date|order_share|
---- ---- -------- ---------- -----------
| c1| a1| 1|2021-01-23| 0.5|
| c1| a2| 1|2021-01-23| 0.2|
| c1| a3| 1|2021-01-23| 0.3|
| c2| a5| 2|2021-03-20| 0.4|
| c2| a6| 2|2021-03-20| 0.6|
| c1| a1| 3|2021-02-20| 0.3|
| c1| a2| 3|2021-02-20| 0.3|
| c1| a3| 3|2021-02-20| 0.4|
---- ---- -------- ---------- -----------
Фрейм данных 2 — df_address
где расположены столбцы customer_id, address_id, the date of address creation
---- ---- ------------
|c_id|a_id|created_date|
---- ---- ------------
| c1| a1| 2020-12-31|
| c1| a2| 2020-04-23|
| c1| a3| 2020-03-23|
| c1| a4| 2020-01-16|
| c2| a5| 2020-12-28|
| c2| a6| 2020-05-16|
| c2| a7| 2020-03-04|
---- ---- ------------
Теперь я хочу объединить обе эти таблицы таким образом, чтобы для каждого идентификатора заказа я получал адрес, df_address
и соответствующая запись должна быть 0.0
в order_share
столбце
Мой вывод должен выглядеть так
---- ---- ------------ -------- ---------- -----------
|c_id|a_id|created_date|order_id|order_date|order_share|
---- ---- ------------ -------- ---------- -----------
| c1| a1| 2020-12-31| 1|2021-01-23| 0.5|
| c1| a2| 2020-04-23| 1|2021-01-23| 0.2|
| c1| a3| 2020-03-23| 1|2021-01-23| 0.3|
| c1| a4| 2020-01-16| 1|2021-01-23| 0.0|
| c2| a5| 2020-12-28| 2|2021-03-20| 0.4|
| c2| a6| 2020-05-16| 2|2021-03-20| 0.6|
| c2| a7| 2020-03-04| 2|2021-03-20| 0.0|
| c1| a1| 2020-12-31| 3|2021-02-20| 0.3|
| c1| a2| 2020-04-23| 3|2021-02-20| 0.3|
| c1| a3| 2020-03-23| 3|2021-02-20| 0.4|
| c1| a4| 2020-01-16| 3|2021-02-20| 0.0|
---- ---- ------------ -------- ---------- -----------
Это не похоже на обычное соединение слева/справа, и я должен делать это для каждого идентификатора заказа.
Я попытался присоединиться с помощью ['c_id','a_id']
, но результат не близок к ожидаемому. Рассматривая df_address
как слева, df
так и справа, использование left join
дает мне нулевые значения для order_id
и right join
не дает мне всех адресов из df_address
Похоже, мне нужно применить какой-то groupby для каждого order_id, а затем применить объединение для каждой группы, но я не знаю, как это реализовать или даже уверен, что это правильный способ сделать это
Любая помощь будет признательна. Спасибо!
Комментарии:
1. что мешает вам
left
присоединитьсяc_id
иa_id
2. Пожалуйста, проверьте правку вопроса, я изложил проблему, с которой столкнулся при использовании левого соединения
Ответ №1:
Вы можете использовать orders
промежуточный фрейм данных, созданный из df
фрейма данных и содержащий только информацию о заказах, которые являются столбцами customer_id
, order_id
и order_date
. Затем вы сначала внутренне df_address
соединяете фрейм данных с этим orders
фреймом данных, чтобы связать каждую пару (customer_id, address_id)
с информацией о конкретных заказах, а затем слева соединяете результирующий df
фрейм данных с фреймом данных, чтобы получить order_share
каждый адрес, а затем заменяете null
значение в order_share
столбце на 0.0
.
Вот полный код:
from pyspark.sql import functions as F
# Orders dataframe that contains only orders-specific information
orders = df.select('customer_id', 'order_id', 'order_date').distinct()
df_address.join(orders, ['customer_id']) # link addresses with orders
.join(df.drop('order_date'), ['customer_id', 'address_id', 'order_id'], 'left_outer') # link orders/addresses with order shares
.withColumn('order_share', F.when(F.col('order_share').isNotNull(), F.col('order_share')).otherwise(F.lit(0.0))) # replace null in order_share column with 0.0
.orderBy('customer_id', 'order_id', 'address_id') # optional, to reorder dataframe
Подробные сведения
Примечание: Я переупорядочил все фреймы данных здесь order_id
и address_id
для удобства чтения
Начиная с df
фрейма данных в вашем вопросе, мы получаем следующий фрейм данных orders
:
----------- -------- ----------
|customer_id|order_id|order_date|
----------- -------- ----------
|c1 |1 |2021-01-23|
|c2 |2 |2021-03-20|
|c1 |3 |2021-02-20|
----------- -------- ----------
Затем мы соединяем этот фрейм orders
данных с фреймом данных df_address
:
----------- ---------- ------------ -------- ----------
|customer_id|address_id|created_date|order_id|order_date|
----------- ---------- ------------ -------- ----------
|c1 |a1 |2020-12-31 |1 |2021-01-23|
|c1 |a2 |2020-04-23 |1 |2021-01-23|
|c1 |a3 |2020-03-23 |1 |2021-01-23|
|c1 |a4 |2020-01-16 |1 |2021-01-23|
|c2 |a5 |2020-12-28 |2 |2021-03-20|
|c2 |a6 |2020-05-16 |2 |2021-03-20|
|c2 |a7 |2020-03-04 |2 |2021-03-20|
|c1 |a1 |2020-12-31 |3 |2021-02-20|
|c1 |a2 |2020-04-23 |3 |2021-02-20|
|c1 |a3 |2020-03-23 |3 |2021-02-20|
|c1 |a4 |2020-01-16 |3 |2021-02-20|
----------- ---------- ------------ -------- ----------
И с последним соединением с df
фреймом данных без столбца order_date
мы получаем:
----------- ---------- -------- ------------ ---------- -----------
|customer_id|address_id|order_id|created_date|order_date|order_share|
----------- ---------- -------- ------------ ---------- -----------
|c1 |a1 |1 |2020-12-31 |2021-01-23|0.5 |
|c1 |a2 |1 |2020-04-23 |2021-01-23|0.2 |
|c1 |a3 |1 |2020-03-23 |2021-01-23|0.3 |
|c1 |a4 |1 |2020-01-16 |2021-01-23|null |
|c2 |a5 |2 |2020-12-28 |2021-03-20|0.4 |
|c2 |a6 |2 |2020-05-16 |2021-03-20|0.6 |
|c2 |a7 |2 |2020-03-04 |2021-03-20|null |
|c1 |a1 |3 |2020-12-31 |2021-02-20|0.3 |
|c1 |a2 |3 |2020-04-23 |2021-02-20|0.3 |
|c1 |a3 |3 |2020-03-23 |2021-02-20|0.4 |
|c1 |a4 |3 |2020-01-16 |2021-02-20|null |
----------- ---------- -------- ------------ ---------- -----------
Затем нам просто нужно заменить null
на 0.0
, и мы получим наш ожидаемый фрейм данных:
----------- ---------- -------- ------------ ---------- -----------
|customer_id|address_id|order_id|created_date|order_date|order_share|
----------- ---------- -------- ------------ ---------- -----------
| c1| a1| 1| 2020-12-31|2021-01-23| 0.5|
| c1| a2| 1| 2020-04-23|2021-01-23| 0.2|
| c1| a3| 1| 2020-03-23|2021-01-23| 0.3|
| c1| a4| 1| 2020-01-16|2021-01-23| 0.0|
| c2| a5| 2| 2020-12-28|2021-03-20| 0.4|
| c2| a6| 2| 2020-05-16|2021-03-20| 0.6|
| c2| a7| 2| 2020-03-04|2021-03-20| 0.0|
| c1| a1| 3| 2020-12-31|2021-02-20| 0.3|
| c1| a2| 3| 2020-04-23|2021-02-20| 0.3|
| c1| a3| 3| 2020-03-23|2021-02-20| 0.4|
| c1| a4| 3| 2020-01-16|2021-02-20| 0.0|
----------- ---------- -------- ------------ ---------- -----------
Комментарии:
1. Спасибо. Всего несколько уточнений, я не думаю
distinct
, что это действительно необходимо при созданииorders
, так как строка всегда будет уникальной? А также, используяleft_outer
илиleft
подразумевая одно и то же при присоединении, верно?2. О бесполезном
distinct
, строки только сcustomer_id
,order_id
иorder_date
не будут уникальными, потому что мы удалили столбцыaddress_id
иorder_share
. В вашемdf
примере фрейма данных, если вы удалитеdistinct
orders
его , в фрейме данных вы получите три раза(c1, 1, 2021-01-23)
подряд, два раза(c2, 2, 2021-03-20)
подряд и три раза(c1, 3, 2021-02-20)
подряд. Так что тебе придется остатьсяdistinct
. И да,left_outer
иleft
означает то же самое при соединении двух фреймов данных.3. Извини, я виноват. У меня было на уме кое-что другое. Я понимаю, в чем польза
distinct
. Я просто хочу уточнить, мы используемdistinct
взаимозаменяемо сdropDuplicates
правом?4. Да, здесь вы можете использовать
distinct
иdropDuplicates
без аргументов взаимозаменяемо.
Ответ №2:
Я попробовал full outer
объединить с DataFrames
, чтобы получить недостающие c_id
и a_id
комбинации и дальнейшее использование, когда с isNull для Null
значений столбцов, поступающих из df
и заменяющих, когда их значения df_address
снизу являются результатами —
Подготовка Данных
input_str1 = """
c1| a1| 1|2021-01-23| 0.5|
c1| a2| 1|2021-01-23| 0.2|
c1| a3| 1|2021-01-23| 0.3|
c2| a5| 2|2021-03-20| 0.4|
c2| a6| 2|2021-03-20| 0.6|
c1| a1| 3|2021-02-20| 0.3|
c1| a2| 3|2021-02-20| 0.3|
c1| a3| 3|2021-02-20| 0.4
""".split("|")
input_values1 = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str1))
cols1 = list(map(lambda x: x.strip() if x.strip() != '' else None, "c_id|a_id|order_id|order_date|order_share".split("|")))
n = len(input_values1)
n_col1 = 5
input_list1 = [tuple(input_values1[i:i n_col1]) for i in range(0,n,n_col1)]
sparkDF1 = sql.createDataFrame(input_list1, cols1)
sparkDF1.show()
---- ---- -------- ---------- -----------
|c_id|a_id|order_id|order_date|order_share|
---- ---- -------- ---------- -----------
| c1| a1| 1|2021-01-23| 0.5|
| c1| a2| 1|2021-01-23| 0.2|
| c1| a3| 1|2021-01-23| 0.3|
| c2| a5| 2|2021-03-20| 0.4|
| c2| a6| 2|2021-03-20| 0.6|
| c1| a1| 3|2021-02-20| 0.3|
| c1| a2| 3|2021-02-20| 0.3|
| c1| a3| 3|2021-02-20| 0.4|
---- ---- -------- ---------- -----------
input_str2 = """
c1| a1| 2020-12-31|
c1| a2| 2020-04-23|
c1| a3| 2020-03-23|
c1| a4| 2020-01-16|
c2| a5| 2020-12-28|
c2| a6| 2020-05-16|
c2| a7| 2020-03-04
""".split("|")
input_values2 = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str2))
cols2 = list(map(lambda x: x.strip() if x.strip() != '' else None, "c_id|a_id|created_date".split("|")))
n = len(input_values2)
n_col2 = 3
input_list2 = [tuple(input_values2[i:i n_col2]) for i in range(0,n,n_col2)]
sparkDF2 = sql.createDataFrame(input_list2, cols2)
sparkDF2.show()
---- ---- ------------
|c_id|a_id|created_date|
---- ---- ------------
| c1| a1| 2020-12-31|
| c1| a2| 2020-04-23|
| c1| a3| 2020-03-23|
| c1| a4| 2020-01-16|
| c2| a5| 2020-12-28|
| c2| a6| 2020-05-16|
| c2| a7| 2020-03-04|
---- ---- ------------
Полное присоединение
Переименование значений столбцов , поступающих из SparkDF2, которые в дальнейшем будут использоваться для заполнения нулевых значений, чтобы избежать неоднозначных имен столбцов
finalDF = sparkDF1.join(sparkDF2
, (sparkDF1['c_id'] == sparkDF2['c_id'])
amp; (sparkDF1['a_id'] == sparkDF2['a_id'])
,'full'
).select(sparkDF1['*']
,sparkDF2['c_id'].alias('c_id_address')
,sparkDF2['a_id'].alias('a_id_address')
,sparkDF2['created_date']
)
finalDF.show()
---- ---- -------- ---------- ----------- ------------ ------------ ------------
|c_id|a_id|order_id|order_date|order_share|c_id_address|a_id_address|created_date|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
| c1| a3| 1|2021-01-23| 0.3| c1| a3| 2020-03-23|
| c1| a3| 3|2021-02-20| 0.4| c1| a3| 2020-03-23|
| c2| a5| 2|2021-03-20| 0.4| c2| a5| 2020-12-28|
|null|null| null| null| null| c2| a7| 2020-03-04|
| c1| a2| 1|2021-01-23| 0.2| c1| a2| 2020-04-23|
| c1| a2| 3|2021-02-20| 0.3| c1| a2| 2020-04-23|
| c1| a1| 1|2021-01-23| 0.5| c1| a1| 2020-12-31|
| c1| a1| 3|2021-02-20| 0.3| c1| a1| 2020-12-31|
|null|null| null| null| null| c1| a4| 2020-01-16|
| c2| a6| 2|2021-03-20| 0.6| c2| a6| 2020-05-16|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
Когда это все
finalDF = finalDF.withColumn('c_id',F.when(F.col('c_id').isNull()
,F.col('c_id_address')).otherwise(F.col('c_id'))
)
.withColumn('a_id',F.when(F.col('a_id').isNull()
,F.col('a_id_address')).otherwise(F.col('a_id'))
)
.withColumn('order_share',F.when(F.col('order_share').isNull()
,0.0).otherwise(F.col('order_share'))
)
finalDF.show()
---- ---- -------- ---------- ----------- ------------ ------------ ------------
|c_id|a_id|order_id|order_date|order_share|c_id_address|a_id_address|created_date|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
| c1| a3| 1|2021-01-23| 0.3| c1| a3| 2020-03-23|
| c1| a3| 3|2021-02-20| 0.4| c1| a3| 2020-03-23|
| c2| a5| 2|2021-03-20| 0.4| c2| a5| 2020-12-28|
| c2| a7| null| null| 0.0| c2| a7| 2020-03-04|
| c1| a2| 1|2021-01-23| 0.2| c1| a2| 2020-04-23|
| c1| a2| 3|2021-02-20| 0.3| c1| a2| 2020-04-23|
| c1| a1| 1|2021-01-23| 0.5| c1| a1| 2020-12-31|
| c1| a1| 3|2021-02-20| 0.3| c1| a1| 2020-12-31|
| c1| a4| null| null| 0.0| c1| a4| 2020-01-16|
| c2| a6| 2|2021-03-20| 0.6| c2| a6| 2020-05-16|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
Примечание — order_id
и order_date
равны нулю, так как для c_id
a_id
комбинации и в sparkDF2
Этот пример представляет собой подход , направленный на получение требуемого решения , которое вы можете дополнительно импровизировать, если потребуется, чтобы заполнить недостающие значения порядка