преобразование столбца scala в счетчик уникальных значений столбца

#scala #apache-spark #dictionary #counter

#scala #apache-spark #словарь #счетчик

Вопрос:

Как я могу правильно получить значения столбца в виде карты (k-> v), где k — уникальное значение, а v — количество встреч? Я делаю это в groupby.

 val getMapUDF = udf((arr: Array[Long]) => {arr.groupBy(identity).map{ case (x,y) => x -> y.size}})
    
df
    .withWatermark("time", "30 seconds")
    .groupBy(window(col("time"), "1 minutes").alias("someTime"), col("foo"), col("bar"))
    .agg(count("*").alias("rowCount"), collect_list(col("aaa")).alias("aaaList"))
    .withColumn("qtypes", getMapUDF(col("foobar")))
 

Редактировать:
ввод

  ----------- ------------------- 
| foo | bar | foobar            |
 ----------- ------------------- 
| aaa | a   | [1,1,1,2,3,3]     |
| bbb | b   | [1,2,3,1,2]       |
 ----------- ------------------- 
 

ожидаемый результат

  ----------- -------------------- 
| foo | bar | foobarMap          |
 ----------- -------------------- 
| aaa | a   | [1->3, 2->1, 3->2] |
| bbb | b   | [1->2, 2->2, 3->1] |
 ----------- -------------------- 
 

Вопрос: могу ли я использовать map_from_arrays ?

Комментарии:

1. можете ли вы добавить некоторые примеры ввода и вывода

Ответ №1:

Думаю, это то, что вы ищете, учитывая массив arr

 val arr: Array[Long] = Array(1,1,1,2,3,3)

arr.groupBy(identity).mapValues(_.size)
 

Комментарии:

1. Я думаю, что это ничем не отличается от моего подхода. как насчет. просмотр и отображение? Нужно ли мне это в вашей оценке в scala <2.13?

2. Почему вы хотите переписать код? Вы хотите оптимизировать производительность и избавиться от UDFS, используя вместо этого преобразования столбцов Spark?

3. Я работаю с потоковым фреймом данных со скоростью 100 тыс. строк в секунду. Для меня важна производительность. Я хочу вернуть карту значений столбцов с подсчитанными уникальными значениями.

4. как я могу получить Map(uniqueValue-> valueCount) из значений столбца с помощью искровых преобразований?

5. Я не могу использовать другой groupBy, потому что StructuredStreaming не поддерживает несколько groupBy, и я не могу использовать foreachBatch в этом случае.

Ответ №2:

Так что, если вы хотите заменить свой UDF только преобразованиями Spark SQL API / Column, это может быть то, что вы хотите

     val data = Seq(
      ("aaa","a",Array(1,1,1,2,3,3)),
      ("bbb","b",Array(1,2,3,1,2))
    )
    
    val df = spark.createDataset(data).toDF("foo", "bar", "foobar")

    val res = df.select($"foo",explode_outer($"foobar"))
      .groupBy("foo","col").count()
      .withColumn("mapped",map($"col",$"count"))
      .groupBy("foo")
      .agg(collect_list("mapped"))

    res.show(false)
 

Итак, вы получите это

  --- ------------------------------ 
|foo|collect_list(mapped)          |
 --- ------------------------------ 
|aaa|[[3 -> 2], [1 -> 3], [2 -> 1]]|
|bbb|[[2 -> 2], [1 -> 2], [3 -> 1]]|
 --- ------------------------------ 
 

Надеюсь, это как-то поможет вам

Комментарии:

1. нет, я не могу выполнить несколько групп, как объяснено, потому что я использую StructuredStreaming и не могу использовать foreachBatch.

Ответ №3:

Я думаю, что вместо этого можно было бы collect_list что-то сделать, чтобы вы могли получить то, что хотите, не выполняя 2 groupBy . Я предполагаю, что ваши входные данные выглядят так, как df показано ниже.

 import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

df.show
 --- --- --- 
|foo|bar|aaa|
 --- --- --- 
|aaa|  a|  1|
|aaa|  a|  1|
|aaa|  a|  1|
|aaa|  a|  2|
|aaa|  a|  3|
|aaa|  a|  3|
|bbb|  b|  1|
|bbb|  b|  2|
|bbb|  b|  3|
|bbb|  b|  1|
|bbb|  b|  2|
 --- --- --- 

val df2 = df.withColumn(
    "foobarmap",
    struct(
        $"aaa",
        count("aaa").over(Window.partitionBy("foo", "bar", "aaa"))
    )
).groupBy(
    "foo", "bar"
).agg(
    count("*").alias("rowcount"), 
    map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")

df2.show(2,0)
 --- --- -------- ------------------------ 
|foo|bar|rowcount|foobarmap               |
 --- --- -------- ------------------------ 
|aaa|a  |6       |[2 -> 1, 3 -> 2, 1 -> 3]|
|bbb|b  |5       |[2 -> 2, 3 -> 1, 1 -> 2]|
 --- --- -------- ------------------------ 
 

Чтобы добавить водяной знак и сгруппировать по окну, возможно, удастся реализовать ваш код, как показано ниже:

 val df2 = df.withWatermark(
    "time", "30 seconds"
).withColumn(
    "foobarmap",
    struct(
        $"aaa",
        count("aaa").over(Window.partitionBy(window(col("time"), "1 minutes"), "foo", "bar", "aaa"))
    ).alias("foobarmap")
).groupBy(
    window(col("time"), "1 minutes"), "foo", "bar"
).agg(
    count("*").alias("rowcount"), 
    map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")