#scala #apache-spark #apache-spark-sql
Вопрос:
У меня есть такой df
import spark.implicits._ import org.apache.spark.sql.functions._ val latenies = Seq( ("start","304875","2021-10-25 21:26:23.486027"), ("start","304875","2021-10-25 21:26:23.486670"), ("end","304875","2021-10-25 21:26:23.487590"), ("start","304875","2021-10-25 21:26:23.509683"), ("end","304875","2021-10-25 21:26:23.509689"), ("end","304875","2021-10-25 21:26:23.510154"), ("start","201345","2021-10-25 21:26:23.510156"), ("end","201345","2021-10-25 21:26:23.510159"), ("start","201345","2021-10-25 21:26:23.510333"), ("start","201345","2021-10-25 21:26:23.510335"), ("end","201345","2021-10-25 21:26:23.513177"), ("start","201345","2021-10-25 21:26:23.513187") ) val latenies_df = latenies.toDF("Msg_name","Id_num","TimeStamp") .withColumn("TimeStamp", to_timestamp(col("TimeStamp"))) latenies_df.show(false)
это выглядит так:
-------- ------ -------------------------- |Msg_name|Id_num|TimeStamp | -------- ------ -------------------------- |start |304875|2021-10-25 21:26:23.486027| |start |304875|2021-10-25 21:26:23.48667 | |end |304875|2021-10-25 21:26:23.48759 | |start |304875|2021-10-25 21:26:23.509683| |end |304875|2021-10-25 21:26:23.509689| |end |304875|2021-10-25 21:26:23.510154| |start |201345|2021-10-25 21:26:23.510156| |end |201345|2021-10-25 21:26:23.510159| |start |201345|2021-10-25 21:26:23.510333| |start |201345|2021-10-25 21:26:23.510335| |end |201345|2021-10-25 21:26:23.513177| |start |201345|2021-10-25 21:26:23.513187| -------- ------ --------------------------
Вопрос: Я бы хотел извлечь определенный шаблон в столбце Msg_name
, который всегда start
имеет значение end
when, когда он разделен Id
и упорядочен time
. Msg
может иметь несколько запусков один за другим или завершений. Я только start-end
ничего не хочу между ними.
С помощью этого шаблона я хотел бы сделать df как таковой:
|patter_name|Timestamp_start |Timestamp_end |Id_num | | pattern1|2021-10-25 21:26:23.486670|2021-10-25 21:26:23.487590|304875 | | pattern1|2021-10-25 21:26:23.509683|2021-10-25 21:26:23.509689|304875 | | pattern1|2021-10-25 21:26:23.510156|2021-10-25 21:26:23.510159|201345 | | pattern1|2021-10-25 21:26:23.510335|2021-10-25 21:26:23.513177|201345 |
То, что я сделал, — это сдвинул рамку, что не даст мне правильного ответа из-за характера Msg_name
столбца.
val window = org.apache.spark.sql.expressions.Window.partitionBy("Id_num").orderBy("TimeStamp") val df_only_pattern = latenies_df.withColumn("TimeStamp_start", when($"Msg_name" !== lag($"Msg_name", 1).over(window), lag("TimeStamp", 1).over(window)).otherwise(lit(null))) .withColumn("latency_time", when($"TimeStamp_start".isNotNull, round((col("TimeStamp").cast("double")-col("TimeStamp_start").cast("double")) * 1e3, 2)).otherwise(lit(null))) .withColumnRenamed("TimeStamp", "TimeStamp_end") .withColumn("patter_name", lit("pattern1")) .na.drop() df_only_pattern.orderBy("TimeStamp_start").show(false)
Что это дает:
-------- ------ -------------------------- -------------------------- ------------ ----------- |Msg_name|Id_num|TimeStamp_end |TimeStamp_start |latency_time|patter_name| -------- ------ -------------------------- -------------------------- ------------ ----------- |end |304875|2021-10-25 21:26:23.48759 |2021-10-25 21:26:23.48667 |0.92 |pattern1 | |start |304875|2021-10-25 21:26:23.509683|2021-10-25 21:26:23.48759 |22.09 |pattern1 | |end |304875|2021-10-25 21:26:23.509689|2021-10-25 21:26:23.509683|0.01 |pattern1 | |end |201345|2021-10-25 21:26:23.510159|2021-10-25 21:26:23.510156|0.0 |pattern1 | |start |201345|2021-10-25 21:26:23.510333|2021-10-25 21:26:23.510159|0.17 |pattern1 | |end |201345|2021-10-25 21:26:23.513177|2021-10-25 21:26:23.510335|2.84 |pattern1 | |start |201345|2021-10-25 21:26:23.513187|2021-10-25 21:26:23.513177|0.01 |pattern1 | -------- ------ -------------------------- -------------------------- ------------ -----------
Я могу добиться желаемого df
с помощью панд python с помощью groupby и циклирования внутри группы, что кажется невозможным в spark.
Ответ №1:
Могут быть приняты сообщения «конец», которые имеют «начало» в предыдущей строке:
latenies_df .withColumn("TimeStamp_start", when(lag($"Msg_name", 1).over(window) === lit("start"), lag($"TimeStamp", 1).over(window)) .otherwise(lit(null).cast(TimestampType)) ) .where($"Msg_name" === lit("end")) .where($"TimeStamp_start".isNotNull) .select( lit("pattern1").alias("patter_name"), $"TimeStamp_start", $"TimeStamp".alias("Timestamp_end"), $"Id_num" )
Результат:
----------- -------------------------- -------------------------- ------ |patter_name|TimeStamp_start |Timestamp_end |Id_num| ----------- -------------------------- -------------------------- ------ |pattern1 |2021-10-25 21:26:23.48667 |2021-10-25 21:26:23.48759 |304875| |pattern1 |2021-10-25 21:26:23.509683|2021-10-25 21:26:23.509689|304875| |pattern1 |2021-10-25 21:26:23.510156|2021-10-25 21:26:23.510159|201345| |pattern1 |2021-10-25 21:26:23.510335|2021-10-25 21:26:23.513177|201345| ----------- -------------------------- -------------------------- ------