отфильтруйте данные, используя разные столбцы, и выберите последнюю версию с помощью hive или sql

#mysql #scala #apache-spark #hive #hiveql

Вопрос:

У нас есть требование, в котором нам нужно получить последнюю запись для поля f1 на основе метки времени.

  1. в том же поле f1 нам нужно получить последнюю запись.
  2. если поле f2 имеет значение 3 в том же значении поля f1, то нам нужно проверить, совпадает ли поле f4 этой записи со значением в поле f3 для остальных записей в поле f1, и нам нужно удалить все, что осталось, нам нужно взять последнюю запись из этого.

вход:


f1 f2 f3 f4 отметка времени
a1 1 ключ1b keyc 1:00:00
a1 1 азбука ключ1с 1:20:00
a1 3 ключ1 азбука 1:30:00
в1 1 ключ1b keyc 1:00:00
в1 1 key1a ключ1 1:20:00
в1 1 ключ1 азбука 1:30:00
c1 1 abc1 ключ2 1:00:00
c1 1 abc2 ключ1 1:20:00
c1 3 ключ1 abc1 1:30:00
c1 3 ключ2 abc2 1:30:00
c1 4 ключ3 abc2 1:30:00
c1 2 ключ3 abc2 1:35:00

выход:


f1 f2 f3 f4 отметка времени
a1 1 ключ1b keyc 1:00:00
в1 1 ключ1 азбука 1:30:00
c1 2 ключ3 abc2 1:35:00

Комментарии:

1. функция окна может быть полезна здесь

Ответ №1:

Входные данные могут быть отфильтрованы в соответствии с условием 2) с помощью соединения «left_anti», а затем выбрана последняя запись с помощью функции окна (Spark Scala).:

 val input = Seq(
  ("a1", 1, "key1b", "keyc", "1:00:00"),
  ("a1", 1, "abc", "key1c", "1:20:00"),
  ("a1", 3, "key1", "abc", "1:30:00"),
  ("b1", 1, "key1b", "keyc", "1:00:00"),
  ("b1", 1, "key1a", "key1", "1:20:00"),
  ("b1", 1, "key1", "abc", "1:30:00"),
  ("c1", 1, "abc1", "key2", "1:00:00"),
  ("c1", 1, "abc2", "key1", "1:20:00"),
  ("c1", 3, "key1", "abc1", "1:30:00"),
  ("c1", 3, "key2", "abc2", "1:30:00"),
  ("c1", 4, "key3", "abc2", "1:30:00"),
  ("c1", 2, "key3", "abc2", "1:35:00")
).toDF("f1", "f2", "f3", "f4", "timestamp")

val invalidF4 = input.where($"f2" === 3).select("f1", "f4")

val filtered = input
  .filter($"f2" =!= 3)
  .alias("main")
  .join(
    invalidF4.alias("invalid"),
    $"main.f1" === $"invalid.f1" amp;amp; $"main.f3" === $"invalid.f4",
    "left_anti")

val f1TimestampWindow = Window.partitionBy("f1").orderBy(desc("timestamp"))
val result = filtered
  .withColumn("row_number", row_number().over(f1TimestampWindow))
  .where($"row_number" === 1)
  .drop("row_number")
 

Результат:

  --- --- ----- ---- --------- 
|f1 |f2 |f3   |f4  |timestamp|
 --- --- ----- ---- --------- 
|b1 |1  |key1 |abc |1:30:00  |
|a1 |1  |key1b|keyc|1:00:00  |
|c1 |2  |key3 |abc2|1:35:00  |
 --- --- ----- ---- ---------