Как выбрать максимальное значение из фрейма данных spark за определенный период времени

#sql #scala #apache-spark #apache-spark-sql

#sql #scala #apache-spark #apache-spark-sql

Вопрос:

У меня есть набор записей для некоторых поставок. Набор записей, содержащий столбцы container_no , origin , destination shipment_dt и volume .

Существует несколько записей, имеющих одинаковое значение container_no и имеющих некоторую вероятность того, что один и тот же контейнер был отправлен в разные даты, но если shipment_dt он выпадает в течение 10 дней с промежутком времени, проверьте origin container_no , если все записи имеют разное происхождение, затем удалите все записи, имеющие одинаковый container_no и попадающие в течение 10 дней с промежутком времени, иначе выберите запись с наибольшим объемом.

Пожалуйста, обратите внимание, что: мы определим промежуток времени в 10 дней на основе первого появления container_no .
Пример ввода:
пример ввода

Ожидаемый результат:
пример вывода

Исходное условие назначения выбранных данных:
условие происхождения / назначения

Я уже написал запрос, чтобы получить временной диапазон в десять дней, но не знаю, как мне сравнить источник и назначение и получить записи с наибольшим объемом.

Пример входного запроса для создания фрейма данных:

 val Input_DF = spark.sql("""
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-10' AS shipment_dt , 20 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-10' AS shipment_dt , 30 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-12' AS shipment_dt , 10 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-25' AS shipment_dt , 20 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-26' AS shipment_dt , 10 as volume UNION
    SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-15' AS shipment_dt , 20 as volume UNION
    SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-16' AS shipment_dt , 20 as volume UNION
    SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-17' AS shipment_dt , 50 as volume UNION
    SELECT '12347' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-18' AS shipment_dt , 20 as volume UNION
    SELECT '12347' AS container_no , 'Nepal' AS origin ,'China' AS destination , '2020-10-19' AS shipment_dt , 21 as volume""")

Input_DF.createOrReplaceTempView("Input_DF")
  

Запрос для создания 10-дневного временного интервала для данных:

 val output_df = spark.sql("""
            SELECT
                      B.* ,
                      CASE
                          WHEN from_prev BETWEEN 0 AND 9
                          THEN 1
                          ELSE 0
                      END                     AS recent ,
                      floor(from_first / 10 ) AS recent_group
                  FROM
                      (
                          SELECT
                              A.*,
                              NVL(DATEDIFF(shipment_dt,FIRST(shipment_dt) over(partition BY container_no
                              ORDER BY shipment_dt ASC)) ,0) AS from_first,
                              NVL(DATEDIFF(shipment_dt,lag(shipment_dt,1) over(partition BY container_no
                              ORDER BY shipment_dt ASC)) ,0) from_prev
                          FROM
                              Input_DF A) B 
                  ORDER BY
                    container_no,
                    shipment_dt""")
  

На скриншоте с примером ввода я добавил один дополнительный столбец, чтобы объяснить, чем одна строка в отличается от других записей, имеющих одинаковый container_no, но дата отличается.
Заранее спасибо.

Комментарии:

1. Если я правильно понимаю, для этого требуется рекурсивный CTE — и я не думаю, что SparkSQL их поддерживает. Однако у вас могут быть другие альтернативы.

2. @GordonLinoff У меня нет ясности по CTE:(

3. Какое отношение запрос output_df имеет к скриншотам, которыми вы поделились? Я не понимаю вашу логику if / else. Показывает ли ожидаемый результат результаты в обоих случаях?

4. @kfkhalili цель output_df sql показать, как мы можем разделить записи, имеющие одинаковое значение, container_no но отправленные в разные даты. Это может быть полезно для тех, кто хочет создать запрос.