#sql #scala #apache-spark #apache-spark-sql
#sql #scala #apache-spark #apache-spark-sql
Вопрос:
У меня есть набор записей для некоторых поставок. Набор записей, содержащий столбцы container_no
, origin
, destination
shipment_dt
и volume
.
Существует несколько записей, имеющих одинаковое значение container_no
и имеющих некоторую вероятность того, что один и тот же контейнер был отправлен в разные даты, но если shipment_dt
он выпадает в течение 10 дней с промежутком времени, проверьте origin
container_no
, если все записи имеют разное происхождение, затем удалите все записи, имеющие одинаковый container_no и попадающие в течение 10 дней с промежутком времени, иначе выберите запись с наибольшим объемом.
Пожалуйста, обратите внимание, что: мы определим промежуток времени в 10 дней на основе первого появления container_no
.
Пример ввода:
Исходное условие назначения выбранных данных:
Я уже написал запрос, чтобы получить временной диапазон в десять дней, но не знаю, как мне сравнить источник и назначение и получить записи с наибольшим объемом.
Пример входного запроса для создания фрейма данных:
val Input_DF = spark.sql("""
SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-10' AS shipment_dt , 20 as volume UNION
SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-10' AS shipment_dt , 30 as volume UNION
SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-12' AS shipment_dt , 10 as volume UNION
SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-25' AS shipment_dt , 20 as volume UNION
SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-26' AS shipment_dt , 10 as volume UNION
SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-15' AS shipment_dt , 20 as volume UNION
SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-16' AS shipment_dt , 20 as volume UNION
SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-17' AS shipment_dt , 50 as volume UNION
SELECT '12347' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-18' AS shipment_dt , 20 as volume UNION
SELECT '12347' AS container_no , 'Nepal' AS origin ,'China' AS destination , '2020-10-19' AS shipment_dt , 21 as volume""")
Input_DF.createOrReplaceTempView("Input_DF")
Запрос для создания 10-дневного временного интервала для данных:
val output_df = spark.sql("""
SELECT
B.* ,
CASE
WHEN from_prev BETWEEN 0 AND 9
THEN 1
ELSE 0
END AS recent ,
floor(from_first / 10 ) AS recent_group
FROM
(
SELECT
A.*,
NVL(DATEDIFF(shipment_dt,FIRST(shipment_dt) over(partition BY container_no
ORDER BY shipment_dt ASC)) ,0) AS from_first,
NVL(DATEDIFF(shipment_dt,lag(shipment_dt,1) over(partition BY container_no
ORDER BY shipment_dt ASC)) ,0) from_prev
FROM
Input_DF A) B
ORDER BY
container_no,
shipment_dt""")
На скриншоте с примером ввода я добавил один дополнительный столбец, чтобы объяснить, чем одна строка в отличается от других записей, имеющих одинаковый container_no, но дата отличается.
Заранее спасибо.
Комментарии:
1. Если я правильно понимаю, для этого требуется рекурсивный CTE — и я не думаю, что SparkSQL их поддерживает. Однако у вас могут быть другие альтернативы.
2. @GordonLinoff У меня нет ясности по CTE:(
3. Какое отношение запрос output_df имеет к скриншотам, которыми вы поделились? Я не понимаю вашу логику if / else. Показывает ли ожидаемый результат результаты в обоих случаях?
4. @kfkhalili цель
output_df
sql показать, как мы можем разделить записи, имеющие одинаковое значение,container_no
но отправленные в разные даты. Это может быть полезно для тех, кто хочет создать запрос.