Как отфильтровать карту во фрейме данных: Spark / Scala

#scala #dictionary #apache-spark #dataframe #filter

#scala #словарь #apache-spark #фрейм данных #Фильтр

Вопрос:

Я пытаюсь получить отдельный столбец count для публикации показателей. У меня есть у меня есть df [customerId : string, totalRent : bigint, totalPurchase: bigint, itemTypeCounts: map<string, int> ]

Прямо сейчас я делаю :

 val totalCustomers = df.count

val totalPurchaseCount = df.filter("totalPurchase > 0").count

val totalRentCount = df.filter("totalRent > 0").count


publishMetrics("Total Customer",  totalCustomers )
publishMetrics("Total Purchase",  totalPurchaseCount )
publishMetrics("Total Rent",  totalRentCount )

publishMetrics("Percentage of Rent",  percentage(totalRentCount, totalCustomers) )
publishMetrics("Percentage of Purchase",  percentage(totalPurchaseCount, totalCustomers) )

private def percentageCalc(num: Long, denom: Long): Double = {
val numD: Long = num
val denomD: Long = denom
return if (denomD == 0.0) 0.0
else (numD / denomD) * 100
}
  

Но я не уверен, как мне это сделать для itemTypeCounts, который является картой. Я хочу, чтобы количество и процентное соотношение основывались на каждой ключевой записи. Проблема в том, что значение ключа является динамическим, я имею в виду, что я никак не могу заранее узнать значение ключа. Может кто-нибудь сказать мне, как получить количество для каждого значения ключа. Я новичок в scala / spark, любые другие эффективные подходы для получения количества каждого столбца высоко ценятся.

Пример данных :

 customerId : 1
totalPurchase : 17
totalRent : 0
itemTypeCounts : {"TV" : 4, "Blender" : 2}

customerId : 2
totalPurchase : 1
totalRent : 1
itemTypeCounts : {"Cloths" : 4}

customerId : 3
totalPurchase : 0
totalRent : 10
itemTypeCounts : {"TV" : 4}
  

Таким образом, результат :

 totalCustomer : 3
totalPurchaseCount : 2 (2 customers with totalPurchase > 0)
totalRent : 2 (2 customers with totalRent > 0)
itemTypeCounts_TV : 2
itemTypeCounts_Cloths  : 1
itemTypeCounts_Blender  : 1
  

Комментарии:

1. Не могли бы вы предоставить образец входных данных и желаемый результат?

2. @LiMuBei обновлен образцом данных.

Ответ №1:

Вы можете выполнить это в Spark SQL, я показываю два примера этого ниже (один, где ключи известны и могут быть перечислены в коде, другой, где ключи неизвестны). Обратите внимание, что при использовании Spark SQL вы используете преимущества оптимизатора Catalyst, и это будет работать очень эффективно:

 val data = List((1,17,0,Map("TV" -> 4, "Blender" -> 2)),(2,1,1,Map("Cloths" -> 4)),(3,0,10,Map("TV" -> 4)))
val df = data.toDF("customerId","totalPurchase","totalRent","itemTypeCounts")

//Only good if you can enumerate the keys
def countMapKey(name:String) = {
    count(when($"itemTypeCounts".getItem(name).isNotNull,lit(1))).as(s"itemTypeCounts_$name")
}
val keysToCount = List("TV","Blender","Cloths").map(key => countMapKey(key))
df.select(keysToCount :_*).show
 ----------------- ---------------------- --------------------- 
|itemTypeCounts_TV|itemTypeCounts_Blender|itemTypeCounts_Cloths|
 ----------------- ---------------------- --------------------- 
|                2|                     1|                    1|
 ----------------- ---------------------- --------------------- 

//More generic
val pivotData = df.select(explode(col("itemTypeCounts"))).groupBy(lit(1).as("tmp")).pivot("key").count.drop("tmp")
val renameStatement = pivotData.columns.map(name => col(name).as(s"itemTypeCounts_$name"))

pivotData.select(renameStatement :_*).show
 ---------------------- --------------------- ----------------- 
|itemTypeCounts_Blender|itemTypeCounts_Cloths|itemTypeCounts_TV|
 ---------------------- --------------------- ----------------- 
|                     1|                    1|                2|
 ---------------------- --------------------- ----------------- 
  

Ответ №2:

Я сам новичок в spark, так что, вероятно, есть лучший способ сделать это. Но одна вещь, которую вы могли бы попробовать, это преобразовать itemTypeCounts в структуру данных в scala, с которой вы могли бы работать. Я преобразовал каждую строку в список (Name, Count) пар, например List((Blender,2), (TV,4)) .

С помощью этого вы можете иметь список таких пар, по одному списку пар для каждой строки. В вашем примере это будет список из 3 элементов:

 List(
  List((Blender,2), (TV,4)), 
  List((Cloths,4)), 
  List((TV,4))
) 
  

Как только у вас есть эта структура, преобразование ее в желаемый результат является стандартным scala.

Рабочий пример приведен ниже:

 val itemTypeCounts = df.select("itemTypeCounts")

//Build List of List of Pairs as suggested above
val itemsList = itemTypeCounts.collect().map {
  row =>
    val values = row.getStruct(0).mkString("",",","").split(",")
    val fields = row.schema.head.dataType.asInstanceOf[StructType].map(s => s.name).toList
    fields.zip(values).filter(p => p._2 != "null")
}.toList

// Build a summary map for the list constructed above
def itemTypeCountsSummary(frames: List[List[(String, String)]], summary: Map[String, Int]) : Map[String, Int] = frames match {
  case Nil => summary
  case _ => itemTypeCountsSummary(frames.tail, merge(frames.head, summary))
}

//helper method for the summary map.
def merge(head: List[(String, String)], summary: Map[String, Int]): Map[String, Int] = {
  val headMap = head.toMap.map(e => ("itemTypeCounts_"   e._1, 1))
  val updatedSummary = summary.map{e => if(headMap.contains(e._1)) (e._1, e._2   1) else e}
  updatedSummary    headMap.filter(e => !updatedSummary.contains(e._1))
}

val summaryMap = itemTypeCountsSummary(itemsList, Map())

summaryMap.foreach(e => println(e._1   ": "   e._2 ))
  

Вывод:

 itemTypeCounts_Blender: 1
itemTypeCounts_TV: 2
itemTypeCounts_Cloths: 1
  

Ответ №3:

Заимствуя входные данные у Nick и используя spark-сводное решение sql:

 val data = List((1,17,0,Map("TV" -> 4, "Blender" -> 2)),(2,1,1,Map("Cloths" -> 4)),(3,0,10,Map("TV" -> 4)))
val df = data.toDF("customerId","totalPurchase","totalRent","itemTypeCounts")
df.show(false)
df.createOrReplaceTempView("df")

 ---------- ------------- --------- ----------------------- 
|customerId|totalPurchase|totalRent|itemTypeCounts         |
 ---------- ------------- --------- ----------------------- 
|1         |17           |0        |[TV -> 4, Blender -> 2]|
|2         |1            |1        |[Cloths -> 4]          |
|3         |0            |10       |[TV -> 4]              |
 ---------- ------------- --------- ----------------------- 
  

Предполагая, что мы заранее знаем отдельный тип элемента, мы можем использовать

 val dfr = spark.sql("""
select * from (
select explode(itemTypeCounts) itemTypeCounts from (
select flatten(collect_list(map_keys(itemTypeCounts))) itemTypeCounts from df
) ) t
pivot ( count(itemTypeCounts) as c3 
for itemTypeCounts in ('TV' ,'Blender' ,'Cloths') ) 
""")
dfr.show(false)

 --- ------- ------ 
|TV |Blender|Cloths|
 --- ------- ------ 
|2  |1      |1     |
 --- ------- ------ 
  

Для переименования столбцов,

 dfr.select(dfr.columns.map( x => col(x).alias("itemTypeCounts_"   x )):_* ).show(false)

 ----------------- ---------------------- --------------------- 
|itemTypeCounts_TV|itemTypeCounts_Blender|itemTypeCounts_Cloths|
 ----------------- ---------------------- --------------------- 
|2                |1                     |1                    |
 ----------------- ---------------------- --------------------- 
  

Чтобы динамически получить отдельный тип элемента и передать его в pivot

 val item_count_arr = spark.sql(""" select array_distinct(flatten(collect_list(map_keys(itemTypeCounts)))) itemTypeCounts from df """).as[Array[String]].first

item_count_arr: Array[String] = Array(TV, Blender, Cloths)

spark.sql(s"""
select * from (
select explode(itemTypeCounts) itemTypeCounts from (
select flatten(collect_list(map_keys(itemTypeCounts))) itemTypeCounts from df
) ) t
pivot ( count(itemTypeCounts) as c3 
for itemTypeCounts in (${item_count_arr.map(c => "'" c "'").mkString(",")}) ) 
""").show(false)

 --- ------- ------ 
|TV |Blender|Cloths|
 --- ------- ------ 
|2  |1      |1     |
 --- ------- ------