#mysql #scala #apache-spark #hive #hiveql
Вопрос:
У нас есть требование, в котором нам нужно получить последнюю запись для поля f1 на основе метки времени.
- в том же поле f1 нам нужно получить последнюю запись.
- если поле f2 имеет значение 3 в том же значении поля f1, то нам нужно проверить, совпадает ли поле f4 этой записи со значением в поле f3 для остальных записей в поле f1, и нам нужно удалить все, что осталось, нам нужно взять последнюю запись из этого.
вход:
f1 | f2 | f3 | f4 | отметка времени |
---|---|---|---|---|
a1 | 1 | ключ1b | keyc | 1:00:00 |
a1 | 1 | азбука | ключ1с | 1:20:00 |
a1 | 3 | ключ1 | азбука | 1:30:00 |
в1 | 1 | ключ1b | keyc | 1:00:00 |
в1 | 1 | key1a | ключ1 | 1:20:00 |
в1 | 1 | ключ1 | азбука | 1:30:00 |
c1 | 1 | abc1 | ключ2 | 1:00:00 |
c1 | 1 | abc2 | ключ1 | 1:20:00 |
c1 | 3 | ключ1 | abc1 | 1:30:00 |
c1 | 3 | ключ2 | abc2 | 1:30:00 |
c1 | 4 | ключ3 | abc2 | 1:30:00 |
c1 | 2 | ключ3 | abc2 | 1:35:00 |
выход:
f1 | f2 | f3 | f4 | отметка времени |
---|---|---|---|---|
a1 | 1 | ключ1b | keyc | 1:00:00 |
в1 | 1 | ключ1 | азбука | 1:30:00 |
c1 | 2 | ключ3 | abc2 | 1:35:00 |
Комментарии:
1. функция окна может быть полезна здесь
Ответ №1:
Входные данные могут быть отфильтрованы в соответствии с условием 2) с помощью соединения «left_anti», а затем выбрана последняя запись с помощью функции окна (Spark Scala).:
val input = Seq(
("a1", 1, "key1b", "keyc", "1:00:00"),
("a1", 1, "abc", "key1c", "1:20:00"),
("a1", 3, "key1", "abc", "1:30:00"),
("b1", 1, "key1b", "keyc", "1:00:00"),
("b1", 1, "key1a", "key1", "1:20:00"),
("b1", 1, "key1", "abc", "1:30:00"),
("c1", 1, "abc1", "key2", "1:00:00"),
("c1", 1, "abc2", "key1", "1:20:00"),
("c1", 3, "key1", "abc1", "1:30:00"),
("c1", 3, "key2", "abc2", "1:30:00"),
("c1", 4, "key3", "abc2", "1:30:00"),
("c1", 2, "key3", "abc2", "1:35:00")
).toDF("f1", "f2", "f3", "f4", "timestamp")
val invalidF4 = input.where($"f2" === 3).select("f1", "f4")
val filtered = input
.filter($"f2" =!= 3)
.alias("main")
.join(
invalidF4.alias("invalid"),
$"main.f1" === $"invalid.f1" amp;amp; $"main.f3" === $"invalid.f4",
"left_anti")
val f1TimestampWindow = Window.partitionBy("f1").orderBy(desc("timestamp"))
val result = filtered
.withColumn("row_number", row_number().over(f1TimestampWindow))
.where($"row_number" === 1)
.drop("row_number")
Результат:
--- --- ----- ---- ---------
|f1 |f2 |f3 |f4 |timestamp|
--- --- ----- ---- ---------
|b1 |1 |key1 |abc |1:30:00 |
|a1 |1 |key1b|keyc|1:00:00 |
|c1 |2 |key3 |abc2|1:35:00 |
--- --- ----- ---- ---------