#scala #apache-spark #dictionary #counter
#scala #apache-spark #словарь #счетчик
Вопрос:
Как я могу правильно получить значения столбца в виде карты (k-> v), где k — уникальное значение, а v — количество встреч? Я делаю это в groupby.
val getMapUDF = udf((arr: Array[Long]) => {arr.groupBy(identity).map{ case (x,y) => x -> y.size}})
df
.withWatermark("time", "30 seconds")
.groupBy(window(col("time"), "1 minutes").alias("someTime"), col("foo"), col("bar"))
.agg(count("*").alias("rowCount"), collect_list(col("aaa")).alias("aaaList"))
.withColumn("qtypes", getMapUDF(col("foobar")))
Редактировать:
ввод
----------- -------------------
| foo | bar | foobar |
----------- -------------------
| aaa | a | [1,1,1,2,3,3] |
| bbb | b | [1,2,3,1,2] |
----------- -------------------
ожидаемый результат
----------- --------------------
| foo | bar | foobarMap |
----------- --------------------
| aaa | a | [1->3, 2->1, 3->2] |
| bbb | b | [1->2, 2->2, 3->1] |
----------- --------------------
Вопрос: могу ли я использовать map_from_arrays
?
Комментарии:
1. можете ли вы добавить некоторые примеры ввода и вывода
Ответ №1:
Думаю, это то, что вы ищете, учитывая массив arr
val arr: Array[Long] = Array(1,1,1,2,3,3)
arr.groupBy(identity).mapValues(_.size)
Комментарии:
1. Я думаю, что это ничем не отличается от моего подхода. как насчет. просмотр и отображение? Нужно ли мне это в вашей оценке в scala <2.13?
2. Почему вы хотите переписать код? Вы хотите оптимизировать производительность и избавиться от UDFS, используя вместо этого преобразования столбцов Spark?
3. Я работаю с потоковым фреймом данных со скоростью 100 тыс. строк в секунду. Для меня важна производительность. Я хочу вернуть карту значений столбцов с подсчитанными уникальными значениями.
4. как я могу получить Map(uniqueValue-> valueCount) из значений столбца с помощью искровых преобразований?
5. Я не могу использовать другой groupBy, потому что StructuredStreaming не поддерживает несколько groupBy, и я не могу использовать foreachBatch в этом случае.
Ответ №2:
Так что, если вы хотите заменить свой UDF только преобразованиями Spark SQL API / Column, это может быть то, что вы хотите
val data = Seq(
("aaa","a",Array(1,1,1,2,3,3)),
("bbb","b",Array(1,2,3,1,2))
)
val df = spark.createDataset(data).toDF("foo", "bar", "foobar")
val res = df.select($"foo",explode_outer($"foobar"))
.groupBy("foo","col").count()
.withColumn("mapped",map($"col",$"count"))
.groupBy("foo")
.agg(collect_list("mapped"))
res.show(false)
Итак, вы получите это
--- ------------------------------
|foo|collect_list(mapped) |
--- ------------------------------
|aaa|[[3 -> 2], [1 -> 3], [2 -> 1]]|
|bbb|[[2 -> 2], [1 -> 2], [3 -> 1]]|
--- ------------------------------
Надеюсь, это как-то поможет вам
Комментарии:
1. нет, я не могу выполнить несколько групп, как объяснено, потому что я использую StructuredStreaming и не могу использовать foreachBatch.
Ответ №3:
Я думаю, что вместо этого можно было бы collect_list
что-то сделать, чтобы вы могли получить то, что хотите, не выполняя 2 groupBy
. Я предполагаю, что ваши входные данные выглядят так, как df
показано ниже.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df.show
--- --- ---
|foo|bar|aaa|
--- --- ---
|aaa| a| 1|
|aaa| a| 1|
|aaa| a| 1|
|aaa| a| 2|
|aaa| a| 3|
|aaa| a| 3|
|bbb| b| 1|
|bbb| b| 2|
|bbb| b| 3|
|bbb| b| 1|
|bbb| b| 2|
--- --- ---
val df2 = df.withColumn(
"foobarmap",
struct(
$"aaa",
count("aaa").over(Window.partitionBy("foo", "bar", "aaa"))
)
).groupBy(
"foo", "bar"
).agg(
count("*").alias("rowcount"),
map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")
df2.show(2,0)
--- --- -------- ------------------------
|foo|bar|rowcount|foobarmap |
--- --- -------- ------------------------
|aaa|a |6 |[2 -> 1, 3 -> 2, 1 -> 3]|
|bbb|b |5 |[2 -> 2, 3 -> 1, 1 -> 2]|
--- --- -------- ------------------------
Чтобы добавить водяной знак и сгруппировать по окну, возможно, удастся реализовать ваш код, как показано ниже:
val df2 = df.withWatermark(
"time", "30 seconds"
).withColumn(
"foobarmap",
struct(
$"aaa",
count("aaa").over(Window.partitionBy(window(col("time"), "1 minutes"), "foo", "bar", "aaa"))
).alias("foobarmap")
).groupBy(
window(col("time"), "1 minutes"), "foo", "bar"
).agg(
count("*").alias("rowcount"),
map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")