#scala #apache-spark #pyspark #apache-spark-sql #scala-collections
Вопрос:
Я выполняю агрегацию в следующем фрейме данных, чтобы получить список рекламодателей с множеством брендов
------------ ------
|advertiser |brand |
------------ ------
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
------------ ------
Вот мой код:
import org.apache.spark.sql.functions.collect_list
df2
.groupBy("advertiser")
.agg(collect_list("brand").as("brands"))
Это дает мне следующий фрейм данных:
------------ ----------------
|advertiser |brands |
------------ ----------------
|Advertiser 1|[Brand1, Brand2]|
|Advertiser 2|[Brand3, Brand4]|
|Advertiser 3|[Brand5, Brand6]|
------------ ----------------
Во время агрегирования я хочу отфильтровать список брендов с помощью следующей таблицы брендов :
------ ------------
|brand |brand name |
------ ------------
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
------ ------------
Для того, чтобы достичь:
------------ --------
|advertiser |brands |
------------ --------
|Advertiser 1|[Brand1]|
|Advertiser 2|[Brand3]|
|Advertiser 3|null |
------------ --------
Ответ №1:
Я вижу два решения для вашего вопроса, которые я буду называть Решением сбора и решением объединения
Соберите решение
Если вы можете собрать свой brands
фрейм данных, вы можете использовать эту собранную коллекцию , чтобы при выполнении сохранять только правильные бренды collect_list
, затем flatten
ваш массив и заменить пустой массив null
следующим образом:
import org.apache.spark.sql.functions.{array, col, collect_list, flatten, size, when}
val filteredBrands = brands.select("brand").collect().map(_.getString(0))
val finalDataframe = df2
.groupBy("advertiser")
.agg(collect_list(when(col("brand").isin(filteredBrands: _*), array(col("brand"))).otherwise(array())).as("brands"))
.withColumn("brands", flatten(col("brands")))
.withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
Присоединиться к решению
Если brands
ваш фрейм данных не помещается в память, вы можете сначала присоединиться df2
к левой brands
стороне, чтобы иметь столбец , содержащий бренд, если бренд находится в brands
фрейме данных, иначе null
, затем выполнить группировку и, наконец, заменить пустой массив из-за рекламодателей без брендов, по которым вы хотите отфильтровать null
:
import org.apache.spark.sql.functions.{col, collect_list}
val finalDataframe = df2
.join(brands.select(col("brand").as("filtered_brand")), col("filtered_brand") === col("brand"), "left_outer")
.groupBy("advertiser").agg(collect_list(col("filtered_brand")).as("brands"))
.withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
Подробные сведения
Итак, если мы начнем с df2
фрейма данных, как показано ниже:
------------ ------
|advertiser |brand |
------------ ------
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
------------ ------
И brands
фрейм данных, как показано ниже:
------ ------------
|brand |brand name |
------ ------------
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
------ ------------
После первого левого внешнего соединения между df2
кадрами данных и brands
(первая строка) вы получите следующий кадр данных:
------------ ------ --------------
|advertiser |brand |filtered_brand|
------------ ------ --------------
|Advertiser 1|Brand1|Brand1 |
|Advertiser 1|Brand2|null |
|Advertiser 2|Brand3|Brand3 |
|Advertiser 2|Brand4|null |
|Advertiser 3|Brand5|null |
|Advertiser 3|Brand6|null |
------------ ------ --------------
Когда вы группируете этот фрейм данных по рекламодателю, собирая список отфильтрованных брендов, вы получаете следующий фрейм данных:
------------ --------
|advertiser |brands |
------------ --------
|Advertiser 2|[Brand3]|
|Advertiser 3|[] |
|Advertiser 1|[Brand1]|
------------ --------
И, наконец, когда вы применяете последнюю строку, заменяющую пустой массив на null, вы получаете ожидаемый результат:
------------ --------
|advertiser |brands |
------------ --------
|Advertiser 2|[Brand3]|
|Advertiser 3|null |
|Advertiser 1|[Brand1]|
------------ --------
Вывод
Решение для сбора создает только один дорогостоящий шаг суффле (во время группового сбора), и его следует выбирать в приоритетном порядке, если ваш brands
фрейм данных невелик. Решение для объединения работает brands
, если ваш фрейм данных большой, но оно создает множество дорогостоящих шагов суффле, с одной группой и одним объединением.
Комментарии:
1. Спасибо за ответ, мой
brands
фрейм данных большой. Решение для объединения не совсем работает, так как рекламодатели, у которых нет брендов внутриbrands
фрейма данных, похоже, отфильтровываются (Рекламодатель3). Я хочу сохранить всех рекламодателей и только те бренды, которые есть в таблице брендов2. Нет, рекламодатели, которых нет в
brands
кадре данных, все еще присутствуют в конечном кадре данных. Они добавляются в последнем соединении:.join(df2.select("advertiser").distinct(), Seq("advertiser"), "right_outer")
.3. Хм, по какой-то причине я не могу получить этот результат с последним соединением. Я думал, что будет способ отфильтровать бренды для всех рекламодателей до
groupBy
или в будущемagg
.4. Я немного улучшил решение для объединения, чтобы сохранить только одно соединение между
df2
иbrands
. Я также подробно описал каждый промежуточный кадр данных для решения join, чтобы помочь вам лучше понять, что делается на каждом шаге.5. Я думаю, это работает! Спасибо за объяснение. Единственное, что
withColumn
не ведет себя так, как ожидалось для обновления, которое я сделал. Вместо того, чтобы просто иметьcollect_list(col("filtered_brand")).as("brands")
, я превращаю его в структуру с именем бренда, так что этоcollect_list(struct(col("filtered_brand") as "brand_id", col("brand_name") as "brand_name")).as("brands")