#sql #scala #apache-spark
#sql #scala #apache-spark
Вопрос:
Я хотел бы группировать записи в scala, только если они имеют одинаковый идентификатор и их время в пределах 1 минуты друг от друга.
Концептуально я думаю о чем-то подобном? Но я не совсем уверен
HAVING a.ID = b.ID AND a.time 30 sec > b.time AND a.time - 30 sec < b.time
| ID | volume | Time |
|:-----------|------------:|:--------------------------:|
| 1 | 10 | 2019-02-17T12:00:34Z |
| 2 | 20 | 2019-02-17T11:10:46Z |
| 3 | 30 | 2019-02-17T13:23:34Z |
| 1 | 40 | 2019-02-17T12:01:02Z |
| 2 | 50 | 2019-02-17T11:10:30Z |
| 1 | 60 | 2019-02-17T12:01:57Z |
к этому:
| ID | volume |
|:-----------|------------:|
| 1 | 50 | // (10 40)
| 2 | 70 | // (20 50)
| 3 | 30 |
df.groupBy($"ID", window($"Time", "1 minutes")).sum("volume")
приведенный выше код является решением 1, но он всегда округляется.
Например, 2019-02-17T12:00:45Z будет иметь диапазон
2019-02-17T12:00:00Z TO 2019-02-17T12:01:00Z.
Вместо этого я ищу это:
2019-02-17T11:45:00Z TO 2019-02-17T12:01:45Z.
Есть ли способ?
Комментарии:
1.
12:00:34
и12:01:02
находятся с интервалом в одну минуту друг от друга. Но12:01:02
и12:01:57
также находятся в пределах одной минуты друг от друга. Почему вы не захотели объединить все три? И почему вы предпочитаете объединять первые два, а не последние два?2. Должен ли ваш окончательный результат
2019-02-17T11:45:00Z TO 2019-02-17T12:01:45Z.
читаться как2019-02-17T12:01:45Z TO 2019-02-17T12:01:45Z
?3. 12:00:34 и 12: 01: 02 находятся в пределах 1 минуты. Но 12:00:34 и 12:01:57 нет. Я бы не хотел их объединять, они занимают почти 2 минуты каждая. Надеюсь, это прояснит ситуацию. 2019-02-17T11:45:00Z ( 1m) <2019-02-17T12:00:45Z> ( 1m) 2019-02-17T12: 01:45Z
4. Нет, вы проигнорировали половину моего вопроса. Почему вы не объединяете
12:01:02
и12:01:57
? Разница между ними составляет 55 секунд. Вы не хотите их объединять, потому что12:01:02
они уже объединяются с12:00:34
? В этом случае, если бы также была строка для12:00:01
, все изменилось бы. Вы бы объединили12:00:01
и12:00:34
и отдельно объединили12:01:02
и12:01:57
. Это означало бы, что вы не можете определить, какие строки объединить, не возвращаясь к началу последовательности и не продвигаясь вперед. Это последовательный цикл, чего вы не делаете в SQL.5. вы правы! еще раз спасибо, что поддерживаете меня
Ответ №1:
org.apache.spark.sql.functions
предоставляет перегруженные функции окна, как показано ниже.
1. window(timeColumn: столбец, windowDuration: строка) : Генерирует временные окна, изменяющиеся с учетом столбца, указывающего временную метку. Начало окна включено, но окончание окна исключительное, например, 12:05 будет в окне [12:05,12: 10), но не в [12:00,12:05).
Окна будут выглядеть следующим образом:
{{{
09:00:00-09:01:00
09:01:00-09:02:00
09:02:00-09:03:00 ...
}}}
2. window((timeColumn: столбец, windowDuration: строка, slideDuration: строка):
Группируйте строки в одно или несколько временных окон с учетом столбца, указывающего временную метку. Начало окна включено, но окончание окна исключительное, например, 12:05 будет в окне [12:05,12: 10), но не в [12:00,12:05).
Параметр slideDuration, задающий интервал сдвига окна, например 1 minute
.Новое окно будет генерироваться каждый slideDuration
. Должно быть меньше или равно windowDuration
.
Окна будут выглядеть следующим образом:
{{{
09:00:00-09:01:00
09:00:10-09:01:10
09:00:20-09:01:20 ...
}}}
3. window((timeColumn: столбец, windowDuration: строка, slideDuration: строка, startTime: строка): Сгруппируйте строки в одно или несколько временных окон, учитывая временную метку, указывающую столбец. Начало окна включено, но окончание окна исключительное, например, 12:05 будет в окне [12:05,12: 10), но не в [12:00,12:05).
Окна будут выглядеть следующим образом:
{{{
09:00:05-09:01:05
09:00:15-09:01:15
09:00:25-09:01:25 ...
}}}
Например, для того, чтобы иметь ежечасно повторяющиеся окна, которые начинаются через 15 минут после часа, например 12:15-13:15, 13:15-14:15… предоставьте startTime
как 15 minutes
. Это идеальная функция перегруженного окна, которая соответствует вашим требованиям.
Пожалуйста, найдите рабочий код, как показано ниже.
import org.apache.spark.sql.SparkSession
object SparkWindowTest extends App {
val spark = SparkSession
.builder()
.master("local")
.appName("File_Streaming")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//Prepare Test Data
val df = Seq((1, 10, "2019-02-17 12:00:49"), (2, 20, "2019-02-17 11:10:46"),
(3, 30, "2019-02-17 13:23:34"),(2, 50, "2019-02-17 11:10:30"),
(1, 40, "2019-02-17 12:01:02"), (1, 60, "2019-02-17 12:01:57"))
.toDF("ID", "Volume", "TimeString")
df.show()
df.printSchema()
--- ------ -------------------
| ID|Volume| TimeString|
--- ------ -------------------
| 1| 10|2019-02-17 12:00:49|
| 2| 20|2019-02-17 11:10:46|
| 3| 30|2019-02-17 13:23:34|
| 2| 50|2019-02-17 11:10:30|
| 1| 40|2019-02-17 12:01:02|
| 1| 60|2019-02-17 12:01:57|
--- ------ -------------------
root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- TimeString: string (nullable = true)
//Converted String Timestamp into Timestamp
val modifiedDF = df.withColumn("Time", to_timestamp($"TimeString"))
//Dropped String Timestamp from DF
val modifiedDF1 = modifiedDF.drop("TimeString")
modifiedDF.show(false)
modifiedDF.printSchema()
--- ------ ------------------- -------------------
|ID |Volume|TimeString |Time |
--- ------ ------------------- -------------------
|1 |10 |2019-02-17 12:00:49|2019-02-17 12:00:49|
|2 |20 |2019-02-17 11:10:46|2019-02-17 11:10:46|
|3 |30 |2019-02-17 13:23:34|2019-02-17 13:23:34|
|2 |50 |2019-02-17 11:10:30|2019-02-17 11:10:30|
|1 |40 |2019-02-17 12:01:02|2019-02-17 12:01:02|
|1 |60 |2019-02-17 12:01:57|2019-02-17 12:01:57|
--- ------ ------------------- -------------------
root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- TimeString: string (nullable = true)
|-- Time: timestamp (nullable = true)
modifiedDF1.show(false)
modifiedDF1.printSchema()
--- ------ -------------------
|ID |Volume|Time |
--- ------ -------------------
|1 |10 |2019-02-17 12:00:49|
|2 |20 |2019-02-17 11:10:46|
|3 |30 |2019-02-17 13:23:34|
|2 |50 |2019-02-17 11:10:30|
|1 |40 |2019-02-17 12:01:02|
|1 |60 |2019-02-17 12:01:57|
--- ------ -------------------
root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- Time: timestamp (nullable = true)
//Main logic
val modifiedDF2 = modifiedDF1.groupBy($"ID", window($"Time", "1 minutes","1 minutes","45 seconds")).sum("Volume")
//Renamed all columns of DF.
val newNames = Seq("ID", "WINDOW", "VOLUME")
val finalDF = modifiedDF2.toDF(newNames: _*)
finalDF.show(false)
--- --------------------------------------------- ------
|ID |WINDOW |VOLUME|
--- --------------------------------------------- ------
|2 |[2019-02-17 11:09:45.0,2019-02-17 11:10:45.0]|50 |
|1 |[2019-02-17 12:01:45.0,2019-02-17 12:02:45.0]|60 |
|1 |[2019-02-17 12:00:45.0,2019-02-17 12:01:45.0]|50 |
|3 |[2019-02-17 13:22:45.0,2019-02-17 13:23:45.0]|30 |
|2 |[2019-02-17 11:10:45.0,2019-02-17 11:11:45.0]|20 |
--- --------------------------------------------- ------
}
Комментарии:
1. Ваша первая тестовая запись данных неверна. Это должно быть
12:00:34
, а не12:00:49
. Это изменит результаты. OP не хочет регулярных интервалов в 1 минуту от отметки 45секунд. OP хочет динамические окна на основе данных. Итак, для ID = 1 это было бы12:00:34 -> 12:01:34
и затем12:01:57 -> 12:02:57
, но не12:01:02 -> 12:02:02
, потому что это начальная запись, содержащаяся в первом окне. Чего добиться значительно сложнее (вы должны перейти к самому началу последовательности и выполнить итерацию вперед, что плохо удается всему SQL)2. Отличный ответ! спасибо за всю информацию, было бы неплохо, если бы был способ сделать «startTime» динамическим? вместо каждой отметки в 45 секунд. Программа займет рекордное время и просто добавит по 30 секунд в каждую сторону
3. Время начала может быть динамическим. Вместо жесткого кодирования 45 вы можете передать его в качестве параметра.
4. например, $ «time» вот так? динамический в смысле времени записи, а не общего времени
5. итак, если запись равна 12:01:20. диапазон будет 12:00:20 <-> 12:02:20