Фильтрация значений массива во время агрегации в фрейме данных spark

#scala #apache-spark #pyspark #apache-spark-sql #scala-collections

Вопрос:

Я выполняю агрегацию в следующем фрейме данных, чтобы получить список рекламодателей с множеством брендов

  ------------ ------ 
|advertiser  |brand |
 ------------ ------ 
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
 ------------ ------ 
 

Вот мой код:

 import org.apache.spark.sql.functions.collect_list

df2
  .groupBy("advertiser")
  .agg(collect_list("brand").as("brands"))
 

Это дает мне следующий фрейм данных:

  ------------ ---------------- 
|advertiser  |brands          |
 ------------ ---------------- 
|Advertiser 1|[Brand1, Brand2]|
|Advertiser 2|[Brand3, Brand4]|
|Advertiser 3|[Brand5, Brand6]|
 ------------ ---------------- 
 

Во время агрегирования я хочу отфильтровать список брендов с помощью следующей таблицы брендов :

  ------ ------------ 
|brand |brand name  |
 ------ ------------ 
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
 ------ ------------ 
 

Для того, чтобы достичь:

  ------------ -------- 
|advertiser  |brands  |
 ------------ -------- 
|Advertiser 1|[Brand1]|
|Advertiser 2|[Brand3]|
|Advertiser 3|null    |
 ------------ -------- 
 

Ответ №1:

Я вижу два решения для вашего вопроса, которые я буду называть Решением сбора и решением объединения

Соберите решение

Если вы можете собрать свой brands фрейм данных, вы можете использовать эту собранную коллекцию , чтобы при выполнении сохранять только правильные бренды collect_list , затем flatten ваш массив и заменить пустой массив null следующим образом:

 import org.apache.spark.sql.functions.{array, col, collect_list, flatten, size, when}

val filteredBrands = brands.select("brand").collect().map(_.getString(0))

val finalDataframe = df2
  .groupBy("advertiser")
  .agg(collect_list(when(col("brand").isin(filteredBrands: _*), array(col("brand"))).otherwise(array())).as("brands"))
  .withColumn("brands", flatten(col("brands")))
  .withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
 

Присоединиться к решению

Если brands ваш фрейм данных не помещается в память, вы можете сначала присоединиться df2 к левой brands стороне, чтобы иметь столбец , содержащий бренд, если бренд находится в brands фрейме данных, иначе null , затем выполнить группировку и, наконец, заменить пустой массив из-за рекламодателей без брендов, по которым вы хотите отфильтровать null :

 import org.apache.spark.sql.functions.{col, collect_list}

val finalDataframe = df2
  .join(brands.select(col("brand").as("filtered_brand")), col("filtered_brand") === col("brand"), "left_outer")
  .groupBy("advertiser").agg(collect_list(col("filtered_brand")).as("brands"))
  .withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
 

Подробные сведения

Итак, если мы начнем с df2 фрейма данных, как показано ниже:

  ------------ ------ 
|advertiser  |brand |
 ------------ ------ 
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
 ------------ ------ 
 

И brands фрейм данных, как показано ниже:

  ------ ------------ 
|brand |brand name  |
 ------ ------------ 
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
 ------ ------------ 
 

После первого левого внешнего соединения между df2 кадрами данных и brands (первая строка) вы получите следующий кадр данных:

  ------------ ------ -------------- 
|advertiser  |brand |filtered_brand|
 ------------ ------ -------------- 
|Advertiser 1|Brand1|Brand1        |
|Advertiser 1|Brand2|null          |
|Advertiser 2|Brand3|Brand3        |
|Advertiser 2|Brand4|null          |
|Advertiser 3|Brand5|null          |
|Advertiser 3|Brand6|null          |
 ------------ ------ -------------- 
 

Когда вы группируете этот фрейм данных по рекламодателю, собирая список отфильтрованных брендов, вы получаете следующий фрейм данных:

  ------------ -------- 
|advertiser  |brands  |
 ------------ -------- 
|Advertiser 2|[Brand3]|
|Advertiser 3|[]      |
|Advertiser 1|[Brand1]|
 ------------ -------- 
 

И, наконец, когда вы применяете последнюю строку, заменяющую пустой массив на null, вы получаете ожидаемый результат:

  ------------ -------- 
|advertiser  |brands  |
 ------------ -------- 
|Advertiser 2|[Brand3]|
|Advertiser 3|null    |
|Advertiser 1|[Brand1]|
 ------------ -------- 
 

Вывод

Решение для сбора создает только один дорогостоящий шаг суффле (во время группового сбора), и его следует выбирать в приоритетном порядке, если ваш brands фрейм данных невелик. Решение для объединения работает brands , если ваш фрейм данных большой, но оно создает множество дорогостоящих шагов суффле, с одной группой и одним объединением.

Комментарии:

1. Спасибо за ответ, мой brands фрейм данных большой. Решение для объединения не совсем работает, так как рекламодатели, у которых нет брендов внутри brands фрейма данных, похоже, отфильтровываются (Рекламодатель3). Я хочу сохранить всех рекламодателей и только те бренды, которые есть в таблице брендов

2. Нет, рекламодатели, которых нет в brands кадре данных, все еще присутствуют в конечном кадре данных. Они добавляются в последнем соединении: .join(df2.select("advertiser").distinct(), Seq("advertiser"), "right_outer") .

3. Хм, по какой-то причине я не могу получить этот результат с последним соединением. Я думал, что будет способ отфильтровать бренды для всех рекламодателей до groupBy или в будущем agg .

4. Я немного улучшил решение для объединения, чтобы сохранить только одно соединение между df2 и brands . Я также подробно описал каждый промежуточный кадр данных для решения join, чтобы помочь вам лучше понять, что делается на каждом шаге.

5. Я думаю, это работает! Спасибо за объяснение. Единственное, что withColumn не ведет себя так, как ожидалось для обновления, которое я сделал. Вместо того, чтобы просто иметь collect_list(col("filtered_brand")).as("brands") , я превращаю его в структуру с именем бренда, так что это collect_list(struct(col("filtered_brand") as "brand_id", col("brand_name") as "brand_name")).as("brands")