искра обнаруживает и извлекает шаблон в значениях столбцов

#scala #apache-spark #apache-spark-sql

Вопрос:

У меня есть такой df

 import spark.implicits._  import org.apache.spark.sql.functions._    val latenies = Seq(  ("start","304875","2021-10-25 21:26:23.486027"),  ("start","304875","2021-10-25 21:26:23.486670"),  ("end","304875","2021-10-25 21:26:23.487590"),  ("start","304875","2021-10-25 21:26:23.509683"),  ("end","304875","2021-10-25 21:26:23.509689"),  ("end","304875","2021-10-25 21:26:23.510154"),  ("start","201345","2021-10-25 21:26:23.510156"),  ("end","201345","2021-10-25 21:26:23.510159"),  ("start","201345","2021-10-25 21:26:23.510333"),  ("start","201345","2021-10-25 21:26:23.510335"),  ("end","201345","2021-10-25 21:26:23.513177"),  ("start","201345","2021-10-25 21:26:23.513187")  )  val latenies_df = latenies.toDF("Msg_name","Id_num","TimeStamp")  .withColumn("TimeStamp", to_timestamp(col("TimeStamp")))  latenies_df.show(false)  

это выглядит так:

  -------- ------ --------------------------  |Msg_name|Id_num|TimeStamp |  -------- ------ --------------------------  |start |304875|2021-10-25 21:26:23.486027| |start |304875|2021-10-25 21:26:23.48667 | |end |304875|2021-10-25 21:26:23.48759 | |start |304875|2021-10-25 21:26:23.509683| |end |304875|2021-10-25 21:26:23.509689| |end |304875|2021-10-25 21:26:23.510154| |start |201345|2021-10-25 21:26:23.510156| |end |201345|2021-10-25 21:26:23.510159| |start |201345|2021-10-25 21:26:23.510333| |start |201345|2021-10-25 21:26:23.510335| |end |201345|2021-10-25 21:26:23.513177| |start |201345|2021-10-25 21:26:23.513187|  -------- ------ --------------------------   

Вопрос: Я бы хотел извлечь определенный шаблон в столбце Msg_name , который всегда start имеет значение end when, когда он разделен Id и упорядочен time . Msg может иметь несколько запусков один за другим или завершений. Я только start-end ничего не хочу между ними.

С помощью этого шаблона я хотел бы сделать df как таковой:

 |patter_name|Timestamp_start |Timestamp_end |Id_num | | pattern1|2021-10-25 21:26:23.486670|2021-10-25 21:26:23.487590|304875 | | pattern1|2021-10-25 21:26:23.509683|2021-10-25 21:26:23.509689|304875 | | pattern1|2021-10-25 21:26:23.510156|2021-10-25 21:26:23.510159|201345 | | pattern1|2021-10-25 21:26:23.510335|2021-10-25 21:26:23.513177|201345 |  

То, что я сделал, — это сдвинул рамку, что не даст мне правильного ответа из-за характера Msg_name столбца.

 val window = org.apache.spark.sql.expressions.Window.partitionBy("Id_num").orderBy("TimeStamp")  val df_only_pattern = latenies_df.withColumn("TimeStamp_start", when($"Msg_name" !== lag($"Msg_name", 1).over(window), lag("TimeStamp", 1).over(window)).otherwise(lit(null)))  .withColumn("latency_time", when($"TimeStamp_start".isNotNull, round((col("TimeStamp").cast("double")-col("TimeStamp_start").cast("double")) * 1e3, 2)).otherwise(lit(null)))  .withColumnRenamed("TimeStamp", "TimeStamp_end")  .withColumn("patter_name", lit("pattern1"))  .na.drop()  df_only_pattern.orderBy("TimeStamp_start").show(false)  

Что это дает:

  -------- ------ -------------------------- -------------------------- ------------ -----------  |Msg_name|Id_num|TimeStamp_end |TimeStamp_start |latency_time|patter_name|  -------- ------ -------------------------- -------------------------- ------------ -----------  |end |304875|2021-10-25 21:26:23.48759 |2021-10-25 21:26:23.48667 |0.92 |pattern1 | |start |304875|2021-10-25 21:26:23.509683|2021-10-25 21:26:23.48759 |22.09 |pattern1 | |end |304875|2021-10-25 21:26:23.509689|2021-10-25 21:26:23.509683|0.01 |pattern1 | |end |201345|2021-10-25 21:26:23.510159|2021-10-25 21:26:23.510156|0.0 |pattern1 | |start |201345|2021-10-25 21:26:23.510333|2021-10-25 21:26:23.510159|0.17 |pattern1 | |end |201345|2021-10-25 21:26:23.513177|2021-10-25 21:26:23.510335|2.84 |pattern1 | |start |201345|2021-10-25 21:26:23.513187|2021-10-25 21:26:23.513177|0.01 |pattern1 |  -------- ------ -------------------------- -------------------------- ------------ -----------     

Я могу добиться желаемого df с помощью панд python с помощью groupby и циклирования внутри группы, что кажется невозможным в spark.

Ответ №1:

Могут быть приняты сообщения «конец», которые имеют «начало» в предыдущей строке:

 latenies_df  .withColumn("TimeStamp_start",  when(lag($"Msg_name", 1).over(window) === lit("start"), lag($"TimeStamp", 1).over(window))  .otherwise(lit(null).cast(TimestampType))  )  .where($"Msg_name" === lit("end"))  .where($"TimeStamp_start".isNotNull)   .select(  lit("pattern1").alias("patter_name"),  $"TimeStamp_start",  $"TimeStamp".alias("Timestamp_end"),  $"Id_num"  )  

Результат:

  ----------- -------------------------- -------------------------- ------  |patter_name|TimeStamp_start |Timestamp_end |Id_num|  ----------- -------------------------- -------------------------- ------  |pattern1 |2021-10-25 21:26:23.48667 |2021-10-25 21:26:23.48759 |304875| |pattern1 |2021-10-25 21:26:23.509683|2021-10-25 21:26:23.509689|304875| |pattern1 |2021-10-25 21:26:23.510156|2021-10-25 21:26:23.510159|201345| |pattern1 |2021-10-25 21:26:23.510335|2021-10-25 21:26:23.513177|201345|  ----------- -------------------------- -------------------------- ------