Подсчитайте время ожидания во время соединения по левому краю

#python #python-3.x #apache-spark #pyspark #apache-spark-sql

#python #python-3.x #apache-spark #PySpark #apache-spark-sql

Вопрос:

У меня есть таблица продуктов и субпродуктов. Это довольно маленькая таблица, всего 122 строки и всего 3 столбца:

 | backend | sub_product | product |
|---------|-------------|---------|
| conn_go | go_update   | prod_go |
| conn_go | go_delete   | prod_go |
| conn_go | go_edit     | prod_go |
| conn_rv | rv_update   | prod_rv |
| conn_mb | mb_update   | prod_mb |
| conn_mb | mb_delete   | prod_mb |
| conn_mb | mb_edit     | prod_mb |
| conn_pr | pr_update   | prod_pr |
| conn_pr | pr_edit     | prod_pr |
| conn_ct | pol_edit    | null    |
....
 

Затем у меня есть эта таблица ежедневного использования с разделением, в которой записывается, сколько раз пользователь использовал каждый субпродукт. Эта таблица намного больше (186 ГБ, 247 МБ на раздел, 4,6 млн строк на раздел):

 | backend | yyyy_mm_dd | sub_product | x_id | user_id | count |
|---------|------------|-------------|------|---------|-------|
| conn_go | 2020-12-15 | go_update   | 10   | 3422    | 1     |
| conn_go | 2020-12-15 | go_delete   | 10   | 23445   | 2     |
| conn_go | 2020-12-15 | go_edit     | 10   | 2243    | 2     |
| conn_rv | 2020-12-15 | rv_update   | 10   | 245342  | 1     |
| conn_mb | 2020-12-15 | mb_update   | 11   | 5464    | 3     |
| conn_mb | 2020-12-15 | mb_delete   | 11   | 1424    | 2     |
| conn_mb | 2020-12-15 | mb_edit     | 11   | 21454   | 2     |
| conn_pr | 2020-12-15 | pr_update   | 12   | 224525  | 1     |
| conn_pr | 2020-12-15 | pr_edit     | 12   | 22424   | 1     |
 

Если a user_id не использовал a sub_product в день, то в таблице не будет строки для этой user_id sub_product yyyy_mm_dd комбинации / / usage .
Я хотел бы подсчитать, за x_id , отдельное количество user_id s, которые использовали каждый product . Ниже приведен результат, который я ищу, на основе приведенных выше примеров данных.

 | x_id | product | usage |
|------|---------|-------|
| 10   | prod_go | 3     |
| 10   | prod_rv | 1     |
| 10   | prod_mb | 0     |
| 10   | prop_pr | 0     |
| 10   | null    | 0     |
| 11   | prod_go | 0     |
| 11   | prod_rv | 0     |
| 11   | prod_mb | 3     |
| 11   | prop_pr | 0     |
| 11   | null    | 0     |
| 12   | prod_go | 0     |
| 12   | prod_rv | 0     |
| 12   | prod_mb | 0     |
| 12   | prop_pr | 1     |
| 12   | null    | 1     |
 

Что я пробовал:

 import pyspark.sql
from pyspark.sql import functions as sf

products = (
    spark.table('my_schema.products')
    .select('backend', 'sub_product', 'product')
)

usage = (
    spark.table('my_schema.usage')
    #.where(sf.col('yyyy_mm_dd')).between('2018-08-11', '2020-01-20')
    .where(sf.col('count') > 0)
    .where(sf.col('x_id').isNotNull())
    .select('yyyy_mm_dd', 'x_id', 'user_id', 'backend', 'sub_product', 'count')

)

agg = (
    products
    .join(usage, on = ['backend', 'sub_product'], how = 'left')
    .drop('count')
    .groupby('x_id', 'product')
    .agg(
        sf.countDistinct('user_id')
    )
)

agg.show(30,False)
 

Приведенный выше Python работает довольно долго (более 30 минут) и в конечном итоге истекает:

org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 157 на этапе 136.0 завершалась 10 раз, последний сбой: потерянная задача 157.9 на этапе 136.0: ExecutorLostFailure (executor 3068 завершается из-за одной из запущенных задач) Причина: контейнер помечен как сбой: статус выхода: 143. Диагностика: контейнер уничтожен по запросу. Код выхода — это контейнер 143, из которого вышел ненулевой код выхода 143, уничтоженный внешним сигналом

Кроме того, если я раскомментирую where предложение, я получу другую ошибку:

«выражение фильтра ‘my_schema.usage. yyyy_mm_dd ‘ типа string не является логическим

В. Есть ли какой-либо способ, которым я могу лучше оптимизировать свой код, чтобы получить желаемый результат?


РЕДАКТИРОВАТЬ: использование широковещательного соединения значительно ускорило объединение и позволяет мне получить агрегированный подсчет. Тем не менее, мой код по-прежнему не выдает ожидаемый результат сверху:

 agg = (
    usage
    .join(broadcast(products), on = ['backend', 'sub_product'], how = 'left')
    .drop('count')
    .where(sf.col('product').isNotNull())
    .groupby('x_id', 'product')
    .agg(
        sf.countDistinct('user_id').alias('usage_ever')
    )

)

agg.orderBy('x_id', 'product').show(500,False)
 

Проблема: в моем выводе я не получаю список продуктов, т. x_id Е. Мне не хватает строк, когда у x_id нет пользователей, использующих определенный продукт (в таких случаях я хотел бы видеть 0). Глядя на образец данных, мой вывод не показывает строку для x_id = 10, product = prod_mb и usage = 0.

Ответ №1:

Такая ошибка обычно означает, что ваша большая таблица превышает память исполнителя во время сортировки в случайном порядке, поэтому исполнитель был потерян. В этом случае с маленькой таблицей вы должны присоединить маленькую таблицу к большой, используя BroacastHashJoin:

 from pyspark.sql.functions import broadcast

usage.join(broadcast(products), on = ['backend', 'sub_product'], how = 'left')
 

О том, когда оператор:

 .where(sf.col('yyyy_mm_dd')).between('2018-08-11', '2020-01-20')
 

должно быть:

 .where(sf.col('yyyy_mm_dd').between('2018-08-11', '2020-01-20'))
 

Комментарии:

1. Эта половина отвечает на мой вопрос. Широковещательное соединение работает отлично, значительно ускорило мой запрос и позволило мне получить результаты. Тем не менее, мой код по-прежнему не дает желаемого результата. Я отредактировал исходный вопрос, но, возможно, новый вопрос был бы более подходящим (?)

2. @stackq Вы должны выбрать distinct x_id, затем ПЕРЕКРЕСТНО СОЕДИНИТЬ его с DISTINCT product frame, чтобы получить все комбинации (x_id, product), и после этого ЛЕВОЕ внешнее соединение с ним с помощью agg для x_id и product.

3. Я отправил ответ, основанный на вашем решении / комментарии. Как вы думаете, можно ли еще больше оптимизировать или упростить мой представленный ответ? Если нет, я приму ваш ответ, но если это возможно, тогда не стесняйтесь редактировать свой существующий ответ с помощью моего упрощенного кода, и я также отмечу как ответивший 🙂

4. @stackq в окончательном соединении вы также можете использовать широковещательную передачу (agg)

Ответ №2:

Основываясь на ответе от @valex, я смог придумать это, что, похоже, работает довольно хорошо. Вероятно, его можно уменьшить / упростить:

 import pyspark.sql
from pyspark.sql import functions as sf
from pyspark.sql.functions import broadcast

products = (
    spark.table('my_schema.products')
    .select('backend', 'sub_product', 'product')
)

usage = (
    spark.table('my_schema.usage')
    .where(sf.col('yyyy_mm_dd').between('2018-08-06', '2020-01-20'))
    .where(sf.col('count') > 0)
    .where(sf.col('x_id').isNotNull())
    .select('yyyy_mm_dd', 'x_id', 'user_id', 'backend', 'sub_product', 'product')

)

agg = (
    usage
    .join(broadcast(products), on = ['backend', 'sub_product'], how = 'left')
    .drop('count')
    .where(sf.col('product').isNotNull())
    .groupby('x_id', 'product')
    .agg(
        sf.countDistinct('user_id').alias('usage_ever')
    )

)

distinct_products = (
    spark.table('my_schema.products')
    .select('product').distinct()
)

ids = (
    spark.table('my_schema.all_ids')
    .select(sf.col('id').alias('x_id'))
)

cross_join = (
    ids
    .crossJoin(distinct_products)
)

final = (
    cross_join
    .join(agg, on = ['x_id', 'product'], how = 'left_outer')
)
final.fillna(0, subset=['usage_ever']).orderBy('x_id', 'product').show(500,False)