#sql #apache-spark #pyspark #apache-spark-sql
#sql #apache-spark #PySpark #apache-spark-sql
Вопрос:
Я пытаюсь выполнить соединение между двумя фреймами данных PySpark, соединяясь по ключу, однако дата первой таблицы всегда должна быть после даты второй таблицы. В качестве примера. У нас есть две таблицы, которые мы пытаемся объединить:
Таблица 1:
Date1 value1 key
13 Feb 2020 1 a
01 Mar 2020 2 a
31 Mar 2020 3 a
15 Apr 2020 4 a
Таблица 2:
Date2 value2 key
10 Feb 2020 11 a
15 Mar 2020 22 a
После объединения результат должен быть примерно таким:
Date1 value1 value2 key
13 Feb 2020 1 11 a
01 Mar 2020 2 null a
31 Mar 2020 3 22 a
15 Apr 2020 4 null a
Есть идеи?
Ответ №1:
Это интересное соединение. Мой подход заключается в том, чтобы сначала присоединиться к ключу, выбрать самую раннюю дату и выполнить самосоединение после нахождения самой ранней даты.
from pyspark.sql import functions as F, Window
# Clean up date format first
df3 = df1.withColumn('Date1', F.to_date('Date1', 'dd MMM yyyy'))
df4 = df2.withColumn('Date2', F.to_date('Date2', 'dd MMM yyyy'))
result = (df3.join(df4, 'key')
.filter('Date1 > Date2')
.withColumn('rn', F.row_number().over(Window.partitionBy('Date2').orderBy('Date1')))
.filter('rn = 1')
.drop('key', 'rn', 'Date2')
.join(df3, ['Date1', 'value1'], 'right')
)
result.show()
---------- ------ ------ ---
|Date1 |value1|value2|key|
---------- ------ ------ ---
|2020-02-13|1 |11 |a |
|2020-03-01|2 |null |a |
|2020-03-31|3 |22 |a |
|2020-04-15|4 |null |a |
---------- ------ ------ ---
Комментарии:
1. Это очень сложно… Есть ли какой-либо способ избежать группировки по значению2? В реальном примере гораздо больше столбцов, и значение 2 здесь просто иллюстрация. Почему мы группируем по значению2?
2. @Ehrendil Это потому, что мы хотим сопоставить самую раннюю Дату1, которая соответствует каждой Дате2 / значению2.
3. Спасибо. В моем модуле F, похоже, нет array_min . Кажется, это более новая функция. Что я могу сделать без этого?
4. @Ehrendil Я подчистил свой ответ. Больше нет группы по значению2 и больше нет array_min. Дайте мне знать, если это сработает!
5. Потребовалось некоторое время, чтобы адаптировать его к моему реальному варианту использования, но это сработало отлично. Спасибо! <3
Ответ №2:
Вы можете попробовать функцию задержки окна, это scala, но версия python будет аналогичной.
// change col names for union all and add extra col to indentify dataset
val df1A = df1.toDF("Date","value","key").withColumn("df",lit(1))
val df2A = df2.toDF("Date","value","key").withColumn("df",lit(2))
import org.apache.spark.sql.expressions.Window
df1A.unionAll(df2A)
.withColumn("value2",lag(array('value,'df),1) over Window.partitionBy('key).orderBy(to_date('Date,"dd MMM yyyy")))
.filter('df===1)
.withColumn("value2",when(element_at('value2,2)===2,element_at('value2,1)))
.drop("df")
.show
выходной сигнал:
----------- ----- --- ------
| Date|value|key|value2|
----------- ----- --- ------
|13 Feb 2020| 1| a| 11|
|01 Mar 2020| 2| a| null|
|31 Mar 2020| 3| a| 22|
|15 Apr 2020| 4| a| null|
----------- ----- --- ------