#scala #apache-spark #dataframe #spark-dataframe
#scala #apache-spark #dataframe #apache-spark-sql
Вопрос:
У меня есть dataframe в Spark. Выглядит примерно так:
------- ---------- -------
| value| group| ts|
------- ---------- -------
| A| X| 1|
| B| X| 2|
| B| X| 3|
| D| X| 4|
| E| X| 5|
| A| Y| 1|
| C| Y| 2|
------- ---------- -------
Конечная цель: я хотел бы узнать, сколько существует последовательностей A-B-E
(последовательность — это просто список последующих строк). С добавленным ограничением, согласно которому последующие части последовательности могут находиться на расстоянии не более n
строк друг от друга. Давайте рассмотрим для этого примера, что n
равно 2.
Рассмотрим группу X
. В этом случае существует ровно 1 D
между B
и E
(несколько последовательных B
строк игнорируются). Что означает, что B
и E
находятся на расстоянии 1 строки друг от друга, и, следовательно, существует последовательность A-B-E
Я думал об использовании collect_list()
, создании строки (например, DNA) и использовании поиска по подстроке с регулярным выражением. Но мне было интересно, есть ли более элегантный распределенный способ, возможно, с использованием оконных функций?
Редактировать:
Обратите внимание, что предоставленный dataframe является всего лишь примером. Реальный фрейм данных (и, следовательно, группы) может быть произвольной длины.
Ответ №1:
Отредактировано в ответ на комментарий @Tim’s исправлены шаблоны типа «AABE»
Да, использование оконной функции помогает, но я создал id
, чтобы упорядочить:
val df = List(
(1,"A","X",1),
(2,"B","X",2),
(3,"B","X",3),
(4,"D","X",4),
(5,"E","X",5),
(6,"A","Y",1),
(7,"C","Y",2)
).toDF("id","value","group","ts")
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('group).orderBy('id)
Затем lag соберет то, что необходимо, но для генерации Column
выражения требуется функция (обратите внимание на разделение, чтобы исключить двойной подсчет «AABE». ПРЕДУПРЕЖДЕНИЕ: это отклоняет шаблоны типа «ABAEXX»):
def createSeq(m:Int) = split(
concat(
(1 to 2*m)
.map(i => coalesce(lag('value,-i).over(w),lit("")))
:_*),"A")(0)
val m=2
val tmp = df
.withColumn("seq",createSeq(m))
--- ----- ----- --- ----
| id|value|group| ts| seq|
--- ----- ----- --- ----
| 6| A| Y| 1| C|
| 7| C| Y| 2| |
| 1| A| X| 1|BBDE|
| 2| B| X| 2| BDE|
| 3| B| X| 3| DE|
| 4| D| X| 4| E|
| 5| E| X| 5| |
--- ----- ----- --- ----
Из-за плохого набора функций сбора, доступных в Column
API, избежать регулярных выражений вообще намного проще с помощью UDF
def patternInSeq(m: Int) = udf((str: String) => {
var notFound = str
.split("B")
.filter(_.contains("E"))
.filter(_.indexOf("E") <= m)
.isEmpty
!notFound
})
val res = tmp
.filter(('value === "A") amp;amp; (locate("B",'seq) > 0))
.filter(locate("B",'seq) <= m amp;amp; (locate("E",'seq) > 1))
.filter(patternInSeq(m)('seq))
.groupBy('group)
.count
res.show
----- -----
|group|count|
----- -----
| X| 1|
----- -----
Обобщение (выходит за рамки)
Если вы хотите обобщить последовательность букв, которые длиннее, вопрос должен быть обобщен. Это может быть тривиально, но в этом случае шаблон типа («ABAE») должен быть отклонен (см. Комментарии). Таким образом, самый простой способ обобщения — это иметь правило парности, как в следующей реализации (я добавил группу «Z», чтобы проиллюстрировать поведение этого алгоритма)
val df = List(
(1,"A","X",1),
(2,"B","X",2),
(3,"B","X",3),
(4,"D","X",4),
(5,"E","X",5),
(6,"A","Y",1),
(7,"C","Y",2),
( 8,"A","Z",1),
( 9,"B","Z",2),
(10,"D","Z",3),
(11,"B","Z",4),
(12,"E","Z",5)
).toDF("id","value","group","ts")
Сначала мы определяем логику для пары
import org.apache.spark.sql.DataFrame
def createSeq(m:Int) = array((0 to 2*m).map(i => coalesce(lag('value,-i).over(w),lit(""))):_*)
def filterPairUdf(m: Int, t: (String,String)) = udf((ar: Array[String]) => {
val (a,b) = t
val foundAt = ar
.dropWhile(_ != a)
.takeWhile(_ != a)
.indexOf(b)
foundAt != -1 amp;amp; foundAt <= m
})
Затем мы определяем функцию, которая применяет эту логику итеративно к dataframe
def filterSeq(seq: List[String], m: Int)(df: DataFrame): DataFrame = {
var a = seq(0)
seq.tail.foldLeft(df){(df: DataFrame, b: String) => {
val res = df.filter(filterPairUdf(m,(a,b))('seq))
a = b
res
}}
}
Упрощение и оптимизация достигается за счет того, что мы сначала фильтруем последовательность, начинающуюся с первого символа
val m = 2
val tmp = df
.filter('value === "A") // reduce problem
.withColumn("seq",createSeq(m))
scala> tmp.show()
--- ----- ----- --- ---------------
| id|value|group| ts| seq|
--- ----- ----- --- ---------------
| 6| A| Y| 1| [A, C, , , ]|
| 8| A| Z| 1|[A, B, D, B, E]|
| 1| A| X| 1|[A, B, B, D, E]|
--- ----- ----- --- ---------------
val res = tmp.transform(filterSeq(List("A","B","E"),m))
scala> res.show()
--- ----- ----- --- ---------------
| id|value|group| ts| seq|
--- ----- ----- --- ---------------
| 1| A| X| 1|[A, B, B, D, E]|
--- ----- ----- --- ---------------
( transform
это простое подслащивание DataFrame => DataFrame
преобразования)
res
.groupBy('group)
.count
.show
----- -----
|group|count|
----- -----
| X| 1|
----- -----
Как я уже сказал, существуют различные способы обобщения «правил сброса» при сканировании последовательности, но этот пример, надеюсь, поможет в реализации более сложных.
Комментарии:
1. Я вижу, что ты сделал. Однако предоставленный мной dataframe был всего лишь примером. Представьте, что группы имеют любую длину от 1 до
m
гдеm
может быть до 50. Также я не вижу пользы в использовании window здесь, поскольку в итоге вы все равно прибегаете к решению с регулярными выражениями :).2. Для этого конкретного вопроса преимущество заключается в том, что у вас есть распределенный алгоритм, который в конечном итоге заставляет вас сопоставлять 4 символа только на части исходного номера строки. Кроме того, такое простое регулярное выражение может быть легко оптимизировано в нескольких тестах. Так что это намного лучше, чем собирать все. Это правда, что потребовалось бы немного поработать, чтобы обобщить его на сколь угодно длинные последовательности, но это выходит за рамки данного вопроса. Я отредактирую завтра.
3. Мой плохой, обобщение на
n
шаги между буквами было в вопросе.4. Похоже, что все работает отлично. Однако мне понадобится некоторое время, чтобы понять, что здесь происходит, и выяснить, как обобщить это (не часть вопроса) на любую последовательность, указанную в списке. Не могли бы вы (прежде чем я попытаюсь это разобрать) уточнить, находит ли это решение несколько вхождений последовательности в группе (также не является частью вопроса, но просто любопытно)
5. Итак, я уже выяснил, что ответ на мой последний вопрос «да», пожалуйста, проигнорируйте это :).