#scala #apache-spark
#scala #apache-spark
Вопрос:
Я работаю со Scala и Spark и я относительно новичок в программировании на Scala, поэтому, возможно, у моего вопроса есть простое решение.
У меня есть один фрейм данных, который хранит информацию об активных и деактивированных клиентах в некоторой рекламной акции. В этом фрейме данных отображается идентификатор клиента, действие, которое он / она предпринял (он может активировать или деактивировать участие в акции в любое время) и дата, когда он / она предпринял это действие. Вот пример этого формата:
Пример того, как работает фрейм данных
Я хочу ежедневный мониторинг активных клиентов и хочу видеть, как это число меняется в течение нескольких дней, но я не могу закодировать что-либо, что работает подобным образом.
Моя идея состояла в том, чтобы создать перекрестное соединение двух фреймов данных; один из которых имеет только идентификаторы клиентов, а другой — только даты, поэтому у меня были бы все даты, связанные со всеми идентификаторами клиентов, и мне нужно было только видеть статус клиента в каждой из дат (если клиент активен или неактивен). Итак, после этого я сделал левое соединение этого нового фрейма данных с фреймом данных, который связывал идентификатор клиента и события, но в результате получается множество дат, которые имеют статус «null», и я не знаю, как заполнить его правильным статусом. Вот пример:
Пример окончательного фрейма данных
Я уже пытался использовать функцию задержки, но это не решило мою проблему. Есть ли у кого-нибудь идеи, которые могли бы мне помочь?
Спасибо!
Комментарии:
1. Привет. Не уверен, как помогает DF только с идентификатором клиента.
2. итак, каков правильный статус?
3. Правильный статус — это последнее значение, отличное от Null для каждого идентификатора клиента. Например, для идентификатора 1 даты «03-13» и «03-14» должны быть «активировать» в столбце «действие», потому что значение для «03-12» является этим
4. удобно показать пример, но я представлю решение завтра
Ответ №1:
Немного дорогостоящая операция из-за того, что Spark SQL имеет ограничения на коррелированные подзапросы с <, <= >, >=.
Начиная с вашего второго фрейма данных с нулевыми значениями и предполагая, что достаточно большая система и объем управляемых данных:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
// My sample input
val df = Seq(
(1,"2018-03-12", "activate"),
(1,"2018-03-13", null),
(1,"2018-03-14", null),
(1,"2018-03-15", "deactivate"),
(1,"2018-03-16", null),
(1,"2018-03-17", null),
(1,"2018-03-18", "activate"),
(2,"2018-03-13", "activate"),
(2,"2018-03-14", "deactivate"),
(2,"2018-03-15", "activate")
).toDF("ID", "dt", "act")
//df.show(false)
val w = Window.partitionBy("ID").orderBy(col("dt").asc)
val df2 = df.withColumn("rank", dense_rank().over(w)).select("ID", "dt","act", "rank") //.where("rank == 1")
//df2.show(false)
val df3 = df2.filter($"act".isNull)
//df3.show(false)
val df4 = df2.filter(!($"act".isNull)).toDF("ID2", "dt2", "act2", "rank2")
//df4.show(false)
val df5 = df3.join(df4, (df3("ID") === df4("ID2")) amp;amp; (df4("rank2") < df3("rank")),"inner")
//df5.show(false)
val w2 = Window.partitionBy("ID", "rank").orderBy(col("rank2").desc)
val df6 = df5.withColumn("rank_final", dense_rank().over(w2)).where("rank_final == 1").select("ID", "dt","act2").toDF("ID", "dt", "act")
//df6.show
val df7 = df.filter(!($"act".isNull))
val dfFinal = df6.union(df7)
dfFinal.show(false)
ВОЗВРАТ:
--- ---------- ----------
|ID |dt |act |
--- ---------- ----------
|1 |2018-03-13|activate |
|1 |2018-03-14|activate |
|1 |2018-03-16|deactivate|
|1 |2018-03-17|deactivate|
|1 |2018-03-12|activate |
|1 |2018-03-15|deactivate|
|1 |2018-03-18|activate |
|2 |2018-03-13|activate |
|2 |2018-03-14|deactivate|
|2 |2018-03-15|activate |
--- ---------- ----------
Я решил это поэтапно и в спешке, но не так очевидно.
Комментарии:
1. Это решило мою проблему, большое вам спасибо! Просто для любопытства, функция «dense_rank()» выполняет то же самое, что и функция «row_number()»?
2. Да, я делаю. Спасибо!
3. Круто, нужно обойти несколько вложенных ограничений sql с помощью spark sql. приветствия