Как мне сгруппировать записи, которые находятся в пределах определенного интервала времени, используя Spark Scala или sql?

#sql #scala #apache-spark

#sql #scala #apache-spark

Вопрос:

Я хотел бы группировать записи в scala, только если они имеют одинаковый идентификатор и их время в пределах 1 минуты друг от друга.

Концептуально я думаю о чем-то подобном? Но я не совсем уверен

 HAVING a.ID = b.ID AND a.time   30 sec > b.time AND a.time - 30 sec < b.time




| ID         |     volume  |           Time             |
|:-----------|------------:|:--------------------------:|
| 1          |      10     |    2019-02-17T12:00:34Z    |
| 2          |      20     |    2019-02-17T11:10:46Z    |
| 3          |      30     |    2019-02-17T13:23:34Z    |
| 1          |      40     |    2019-02-17T12:01:02Z    |
| 2          |      50     |    2019-02-17T11:10:30Z    |
| 1          |      60     |    2019-02-17T12:01:57Z    |
  

к этому:

 | ID         |     volume  | 
|:-----------|------------:|
| 1          |      50     |   // (10 40)
| 2          |      70     |   // (20 50)
| 3          |      30     |


df.groupBy($"ID", window($"Time", "1 minutes")).sum("volume")
  

приведенный выше код является решением 1, но он всегда округляется.

Например, 2019-02-17T12:00:45Z будет иметь диапазон

 2019-02-17T12:00:00Z TO 2019-02-17T12:01:00Z.
  

Вместо этого я ищу это:
2019-02-17T11:45:00Z TO 2019-02-17T12:01:45Z.

Есть ли способ?

Комментарии:

1. 12:00:34 и 12:01:02 находятся с интервалом в одну минуту друг от друга. Но 12:01:02 и 12:01:57 также находятся в пределах одной минуты друг от друга. Почему вы не захотели объединить все три? И почему вы предпочитаете объединять первые два, а не последние два?

2. Должен ли ваш окончательный результат 2019-02-17T11:45:00Z TO 2019-02-17T12:01:45Z. читаться как 2019-02-17T12:01:45Z TO 2019-02-17T12:01:45Z ?

3. 12:00:34 и 12: 01: 02 находятся в пределах 1 минуты. Но 12:00:34 и 12:01:57 нет. Я бы не хотел их объединять, они занимают почти 2 минуты каждая. Надеюсь, это прояснит ситуацию. 2019-02-17T11:45:00Z ( 1m) <2019-02-17T12:00:45Z> ( 1m) 2019-02-17T12: 01:45Z

4. Нет, вы проигнорировали половину моего вопроса. Почему вы не объединяете 12:01:02 и 12:01:57 ? Разница между ними составляет 55 секунд. Вы не хотите их объединять, потому что 12:01:02 они уже объединяются с 12:00:34 ? В этом случае, если бы также была строка для 12:00:01 , все изменилось бы. Вы бы объединили 12:00:01 и 12:00:34 и отдельно объединили 12:01:02 и 12:01:57 . Это означало бы, что вы не можете определить, какие строки объединить, не возвращаясь к началу последовательности и не продвигаясь вперед. Это последовательный цикл, чего вы не делаете в SQL.

5. вы правы! еще раз спасибо, что поддерживаете меня

Ответ №1:

org.apache.spark.sql.functions предоставляет перегруженные функции окна, как показано ниже.

1. window(timeColumn: столбец, windowDuration: строка) : Генерирует временные окна, изменяющиеся с учетом столбца, указывающего временную метку. Начало окна включено, но окончание окна исключительное, например, 12:05 будет в окне [12:05,12: 10), но не в [12:00,12:05).

Окна будут выглядеть следующим образом:

   {{{
    09:00:00-09:01:00
    09:01:00-09:02:00
    09:02:00-09:03:00 ...
  }}}
  

2. window((timeColumn: столбец, windowDuration: строка, slideDuration: строка):
Группируйте строки в одно или несколько временных окон с учетом столбца, указывающего временную метку. Начало окна включено, но окончание окна исключительное, например, 12:05 будет в окне [12:05,12: 10), но не в [12:00,12:05).
Параметр slideDuration, задающий интервал сдвига окна, например 1 minute .Новое окно будет генерироваться каждый slideDuration . Должно быть меньше или равно windowDuration .

Окна будут выглядеть следующим образом:

 {{{
  09:00:00-09:01:00
  09:00:10-09:01:10
  09:00:20-09:01:20 ...
}}}
  

3. window((timeColumn: столбец, windowDuration: строка, slideDuration: строка, startTime: строка): Сгруппируйте строки в одно или несколько временных окон, учитывая временную метку, указывающую столбец. Начало окна включено, но окончание окна исключительное, например, 12:05 будет в окне [12:05,12: 10), но не в [12:00,12:05).

Окна будут выглядеть следующим образом:

 {{{
  09:00:05-09:01:05
  09:00:15-09:01:15
  09:00:25-09:01:25 ...
}}}
  

Например, для того, чтобы иметь ежечасно повторяющиеся окна, которые начинаются через 15 минут после часа, например 12:15-13:15, 13:15-14:15… предоставьте startTime как 15 minutes . Это идеальная функция перегруженного окна, которая соответствует вашим требованиям.

Пожалуйста, найдите рабочий код, как показано ниже.

 import org.apache.spark.sql.SparkSession

object SparkWindowTest extends App {

  val spark = SparkSession
    .builder()
    .master("local")
    .appName("File_Streaming")
    .getOrCreate()

  import spark.implicits._
  import org.apache.spark.sql.functions._

  //Prepare Test Data
  val df = Seq((1, 10, "2019-02-17 12:00:49"), (2, 20, "2019-02-17 11:10:46"),
    (3, 30, "2019-02-17 13:23:34"),(2, 50, "2019-02-17 11:10:30"),
    (1, 40, "2019-02-17 12:01:02"), (1, 60, "2019-02-17 12:01:57"))
    .toDF("ID", "Volume", "TimeString")

  df.show()
  df.printSchema()

 --- ------ ------------------- 
| ID|Volume|         TimeString|
 --- ------ ------------------- 
|  1|    10|2019-02-17 12:00:49|
|  2|    20|2019-02-17 11:10:46|
|  3|    30|2019-02-17 13:23:34|
|  2|    50|2019-02-17 11:10:30|
|  1|    40|2019-02-17 12:01:02|
|  1|    60|2019-02-17 12:01:57|
 --- ------ ------------------- 

root
 |-- ID: integer (nullable = false)
 |-- Volume: integer (nullable = false)
 |-- TimeString: string (nullable = true)

  //Converted String Timestamp into Timestamp
  val modifiedDF = df.withColumn("Time", to_timestamp($"TimeString"))

  //Dropped String Timestamp from DF
  val modifiedDF1 = modifiedDF.drop("TimeString")

  modifiedDF.show(false)
  modifiedDF.printSchema()

 --- ------ ------------------- ------------------- 
|ID |Volume|TimeString         |Time               |
 --- ------ ------------------- ------------------- 
|1  |10    |2019-02-17 12:00:49|2019-02-17 12:00:49|
|2  |20    |2019-02-17 11:10:46|2019-02-17 11:10:46|
|3  |30    |2019-02-17 13:23:34|2019-02-17 13:23:34|
|2  |50    |2019-02-17 11:10:30|2019-02-17 11:10:30|
|1  |40    |2019-02-17 12:01:02|2019-02-17 12:01:02|
|1  |60    |2019-02-17 12:01:57|2019-02-17 12:01:57|
 --- ------ ------------------- ------------------- 

root
 |-- ID: integer (nullable = false)
 |-- Volume: integer (nullable = false)
 |-- TimeString: string (nullable = true)
 |-- Time: timestamp (nullable = true)

  modifiedDF1.show(false)
  modifiedDF1.printSchema()

 --- ------ ------------------- 
|ID |Volume|Time               |
 --- ------ ------------------- 
|1  |10    |2019-02-17 12:00:49|
|2  |20    |2019-02-17 11:10:46|
|3  |30    |2019-02-17 13:23:34|
|2  |50    |2019-02-17 11:10:30|
|1  |40    |2019-02-17 12:01:02|
|1  |60    |2019-02-17 12:01:57|
 --- ------ ------------------- 

root
 |-- ID: integer (nullable = false)
 |-- Volume: integer (nullable = false)
 |-- Time: timestamp (nullable = true)

  //Main logic
  val modifiedDF2 = modifiedDF1.groupBy($"ID", window($"Time", "1 minutes","1 minutes","45 seconds")).sum("Volume")

  //Renamed all columns of DF.
  val newNames = Seq("ID", "WINDOW", "VOLUME")
  val finalDF = modifiedDF2.toDF(newNames: _*)

  finalDF.show(false)

 --- --------------------------------------------- ------ 
|ID |WINDOW                                       |VOLUME|
 --- --------------------------------------------- ------ 
|2  |[2019-02-17 11:09:45.0,2019-02-17 11:10:45.0]|50    |
|1  |[2019-02-17 12:01:45.0,2019-02-17 12:02:45.0]|60    |
|1  |[2019-02-17 12:00:45.0,2019-02-17 12:01:45.0]|50    |
|3  |[2019-02-17 13:22:45.0,2019-02-17 13:23:45.0]|30    |
|2  |[2019-02-17 11:10:45.0,2019-02-17 11:11:45.0]|20    |
 --- --------------------------------------------- ------ 

}
  

Комментарии:

1. Ваша первая тестовая запись данных неверна. Это должно быть 12:00:34 , а не 12:00:49 . Это изменит результаты. OP не хочет регулярных интервалов в 1 минуту от отметки 45секунд. OP хочет динамические окна на основе данных. Итак, для ID = 1 это было бы 12:00:34 -> 12:01:34 и затем 12:01:57 -> 12:02:57 , но не 12:01:02 -> 12:02:02 , потому что это начальная запись, содержащаяся в первом окне. Чего добиться значительно сложнее (вы должны перейти к самому началу последовательности и выполнить итерацию вперед, что плохо удается всему SQL)

2. Отличный ответ! спасибо за всю информацию, было бы неплохо, если бы был способ сделать «startTime» динамическим? вместо каждой отметки в 45 секунд. Программа займет рекордное время и просто добавит по 30 секунд в каждую сторону

3. Время начала может быть динамическим. Вместо жесткого кодирования 45 вы можете передать его в качестве параметра.

4. например, $ «time» вот так? динамический в смысле времени записи, а не общего времени

5. итак, если запись равна 12:01:20. диапазон будет 12:00:20 <-> 12:02:20