#python #python-3.x #apache-spark #pyspark #apache-spark-sql
#python #python-3.x #apache-spark #PySpark #apache-spark-sql
Вопрос:
У меня есть таблица продуктов и субпродуктов. Это довольно маленькая таблица, всего 122 строки и всего 3 столбца:
| backend | sub_product | product |
|---------|-------------|---------|
| conn_go | go_update | prod_go |
| conn_go | go_delete | prod_go |
| conn_go | go_edit | prod_go |
| conn_rv | rv_update | prod_rv |
| conn_mb | mb_update | prod_mb |
| conn_mb | mb_delete | prod_mb |
| conn_mb | mb_edit | prod_mb |
| conn_pr | pr_update | prod_pr |
| conn_pr | pr_edit | prod_pr |
| conn_ct | pol_edit | null |
....
Затем у меня есть эта таблица ежедневного использования с разделением, в которой записывается, сколько раз пользователь использовал каждый субпродукт. Эта таблица намного больше (186 ГБ, 247 МБ на раздел, 4,6 млн строк на раздел):
| backend | yyyy_mm_dd | sub_product | x_id | user_id | count |
|---------|------------|-------------|------|---------|-------|
| conn_go | 2020-12-15 | go_update | 10 | 3422 | 1 |
| conn_go | 2020-12-15 | go_delete | 10 | 23445 | 2 |
| conn_go | 2020-12-15 | go_edit | 10 | 2243 | 2 |
| conn_rv | 2020-12-15 | rv_update | 10 | 245342 | 1 |
| conn_mb | 2020-12-15 | mb_update | 11 | 5464 | 3 |
| conn_mb | 2020-12-15 | mb_delete | 11 | 1424 | 2 |
| conn_mb | 2020-12-15 | mb_edit | 11 | 21454 | 2 |
| conn_pr | 2020-12-15 | pr_update | 12 | 224525 | 1 |
| conn_pr | 2020-12-15 | pr_edit | 12 | 22424 | 1 |
Если a user_id
не использовал a sub_product
в день, то в таблице не будет строки для этой user_id
sub_product
yyyy_mm_dd
комбинации / / usage
.
Я хотел бы подсчитать, за x_id
, отдельное количество user_id
s, которые использовали каждый product
. Ниже приведен результат, который я ищу, на основе приведенных выше примеров данных.
| x_id | product | usage |
|------|---------|-------|
| 10 | prod_go | 3 |
| 10 | prod_rv | 1 |
| 10 | prod_mb | 0 |
| 10 | prop_pr | 0 |
| 10 | null | 0 |
| 11 | prod_go | 0 |
| 11 | prod_rv | 0 |
| 11 | prod_mb | 3 |
| 11 | prop_pr | 0 |
| 11 | null | 0 |
| 12 | prod_go | 0 |
| 12 | prod_rv | 0 |
| 12 | prod_mb | 0 |
| 12 | prop_pr | 1 |
| 12 | null | 1 |
Что я пробовал:
import pyspark.sql
from pyspark.sql import functions as sf
products = (
spark.table('my_schema.products')
.select('backend', 'sub_product', 'product')
)
usage = (
spark.table('my_schema.usage')
#.where(sf.col('yyyy_mm_dd')).between('2018-08-11', '2020-01-20')
.where(sf.col('count') > 0)
.where(sf.col('x_id').isNotNull())
.select('yyyy_mm_dd', 'x_id', 'user_id', 'backend', 'sub_product', 'count')
)
agg = (
products
.join(usage, on = ['backend', 'sub_product'], how = 'left')
.drop('count')
.groupby('x_id', 'product')
.agg(
sf.countDistinct('user_id')
)
)
agg.show(30,False)
Приведенный выше Python работает довольно долго (более 30 минут) и в конечном итоге истекает:
org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 157 на этапе 136.0 завершалась 10 раз, последний сбой: потерянная задача 157.9 на этапе 136.0: ExecutorLostFailure (executor 3068 завершается из-за одной из запущенных задач) Причина: контейнер помечен как сбой: статус выхода: 143. Диагностика: контейнер уничтожен по запросу. Код выхода — это контейнер 143, из которого вышел ненулевой код выхода 143, уничтоженный внешним сигналом
Кроме того, если я раскомментирую where
предложение, я получу другую ошибку:
«выражение фильтра ‘my_schema.usage.
yyyy_mm_dd
‘ типа string не является логическим
В. Есть ли какой-либо способ, которым я могу лучше оптимизировать свой код, чтобы получить желаемый результат?
РЕДАКТИРОВАТЬ: использование широковещательного соединения значительно ускорило объединение и позволяет мне получить агрегированный подсчет. Тем не менее, мой код по-прежнему не выдает ожидаемый результат сверху:
agg = (
usage
.join(broadcast(products), on = ['backend', 'sub_product'], how = 'left')
.drop('count')
.where(sf.col('product').isNotNull())
.groupby('x_id', 'product')
.agg(
sf.countDistinct('user_id').alias('usage_ever')
)
)
agg.orderBy('x_id', 'product').show(500,False)
Проблема: в моем выводе я не получаю список продуктов, т. x_id
Е. Мне не хватает строк, когда у x_id нет пользователей, использующих определенный продукт (в таких случаях я хотел бы видеть 0). Глядя на образец данных, мой вывод не показывает строку для x_id = 10, product = prod_mb и usage = 0.
Ответ №1:
Такая ошибка обычно означает, что ваша большая таблица превышает память исполнителя во время сортировки в случайном порядке, поэтому исполнитель был потерян. В этом случае с маленькой таблицей вы должны присоединить маленькую таблицу к большой, используя BroacastHashJoin:
from pyspark.sql.functions import broadcast
usage.join(broadcast(products), on = ['backend', 'sub_product'], how = 'left')
О том, когда оператор:
.where(sf.col('yyyy_mm_dd')).between('2018-08-11', '2020-01-20')
должно быть:
.where(sf.col('yyyy_mm_dd').between('2018-08-11', '2020-01-20'))
Комментарии:
1. Эта половина отвечает на мой вопрос. Широковещательное соединение работает отлично, значительно ускорило мой запрос и позволило мне получить результаты. Тем не менее, мой код по-прежнему не дает желаемого результата. Я отредактировал исходный вопрос, но, возможно, новый вопрос был бы более подходящим (?)
2. @stackq Вы должны выбрать distinct x_id, затем ПЕРЕКРЕСТНО СОЕДИНИТЬ его с DISTINCT product frame, чтобы получить все комбинации (x_id, product), и после этого ЛЕВОЕ внешнее соединение с ним с помощью agg для x_id и product.
3. Я отправил ответ, основанный на вашем решении / комментарии. Как вы думаете, можно ли еще больше оптимизировать или упростить мой представленный ответ? Если нет, я приму ваш ответ, но если это возможно, тогда не стесняйтесь редактировать свой существующий ответ с помощью моего упрощенного кода, и я также отмечу как ответивший 🙂
4. @stackq в окончательном соединении вы также можете использовать широковещательную передачу (agg)
Ответ №2:
Основываясь на ответе от @valex, я смог придумать это, что, похоже, работает довольно хорошо. Вероятно, его можно уменьшить / упростить:
import pyspark.sql
from pyspark.sql import functions as sf
from pyspark.sql.functions import broadcast
products = (
spark.table('my_schema.products')
.select('backend', 'sub_product', 'product')
)
usage = (
spark.table('my_schema.usage')
.where(sf.col('yyyy_mm_dd').between('2018-08-06', '2020-01-20'))
.where(sf.col('count') > 0)
.where(sf.col('x_id').isNotNull())
.select('yyyy_mm_dd', 'x_id', 'user_id', 'backend', 'sub_product', 'product')
)
agg = (
usage
.join(broadcast(products), on = ['backend', 'sub_product'], how = 'left')
.drop('count')
.where(sf.col('product').isNotNull())
.groupby('x_id', 'product')
.agg(
sf.countDistinct('user_id').alias('usage_ever')
)
)
distinct_products = (
spark.table('my_schema.products')
.select('product').distinct()
)
ids = (
spark.table('my_schema.all_ids')
.select(sf.col('id').alias('x_id'))
)
cross_join = (
ids
.crossJoin(distinct_products)
)
final = (
cross_join
.join(agg, on = ['x_id', 'product'], how = 'left_outer')
)
final.fillna(0, subset=['usage_ever']).orderBy('x_id', 'product').show(500,False)