Перекрестное соединение между двумя фреймами данных, зависящее от общего столбца

#apache-spark #pyspark #pyspark-sql

#apache-spark #pyspark #pyspark-sql

Вопрос:

Перекрестное соединение может быть выполнено следующим образом:

 df1 = pd.DataFrame({'subgroup':['A','B','C','D']})
df2 = pd.DataFrame({'dates':pd.date_range(date_today, date_today   timedelta(3), freq='D')})
sdf1 = spark.createDataFrame(df1)
sdf2 = spark.createDataFrame(df2)

sdf1.crossJoin(sdf2).toPandas()
 

В этом примере есть два фрейма данных, каждый из которых содержит 4 строки, в итоге я получаю 16 строк.

Однако для моей проблемы я хотел бы выполнить перекрестное соединение для каждого пользователя, а пользователь — это еще один столбец в двух фреймах данных, например:

 df1 = pd.DataFrame({'user':[1,1,1,1,2,2,2,2],'subgroup':['A','B','C','D','A','B','D','E']})
df2 = pd.DataFrame({'user':[1,1,1,1,2,2,2,2],'dates':np.hstack([np.array(pd.date_range(date_today, date_today   timedelta(3), freq='D')),np.array(pd.date_range(date_today timedelta(1), date_today   timedelta(4), freq='D'))])})
 

Результатом применения перекрестного соединения для каждого пользователя должен быть фрейм данных с 32 строками. Возможно ли это в pyspark и как это можно сделать?

Комментарии:

1. Отфильтруйте каждый фрейм данных на два, по одному для каждого пользователя, перекрестно соедините соответствующие вложенные фреймы данных (чтобы получить 2 фрейма данных, каждый из которых содержит 16 строк), затем объедините два перекрестных фрейма данных

2. @sramalingam24, я должен был упомянуть, что это должно быть обобщено для n пользователей (т. Е. Более 2)

3. Затем вам нужно будет написать функцию, которая выполняет это по диапазону идентификаторов пользователей, предпочтительно в режиме map (filter amp; join) — reduce (объединение)

4. Выполнение этого для диапазона идентификаторов пользователей действительно было бы решением, если бы это можно было сделать способом сокращения карты. Как этого можно достичь?

5. Я бы очень помог, если бы вы могли свести свою проблему к минимальному экземпляру и предоставить образцы данных и образцы выходных данных, чтобы мы могли точно знать, чего вы хотите достичь.

Ответ №1:

Перекрестное соединение — это соединение, которое генерирует умножение строк, потому что ключ объединения не идентифицирует строки однозначно (в нашем случае ключ объединения тривиален или ключа объединения вообще нет)

Давайте начнем с примеров фреймов данных:

 import pyspark.sql.functions as psf
import pyspark.sql.types as pst
df1 = spark.createDataFrame(
    [[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())], 
    schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value1']]))
df2 = spark.createDataFrame(
    [[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())], 
    schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value2']]))

         ---- ------ 
        |user|value1|
         ---- ------ 
        |   0|    76|
        |   1|    59|
        |   0|    14|
        |   1|    71|
        |   0|    66|
        |   1|    61|
        |   0|     2|
        |   1|    22|
        |   0|    16|
        |   1|    83|
         ---- ------ 

         ---- ------ 
        |user|value2|
         ---- ------ 
        |   0|    65|
        |   1|    81|
        |   0|    60|
        |   1|    69|
        |   0|    21|
        |   1|    61|
        |   0|    98|
        |   1|    76|
        |   0|    40|
        |   1|    21|
         ---- ------ 
 

Давайте попробуем соединить фреймы данных в постоянном столбце, чтобы увидеть эквивалентность между перекрестным соединением и обычным соединением в постоянном (тривиальном) столбце:

 df = df1.withColumn('key', psf.lit(1)) 
    .join(df2.withColumn('key', psf.lit(1)), on=['key'])
 

Мы получаем ошибку от spark> 2, потому что он понимает, что мы пытаемся выполнить перекрестное соединение (декартово произведение)

Py4JJavaError: произошла ошибка при вызове o1865.showString. : org.apache.spark.sql.AnalysisException: обнаружено неявное декартово произведение для ВНУТРЕННЕГО соединения между логическими планами LogicalRDD [user#1538, value1# 1539], false и LogicalRDD [user#1542, value2 # 1543], условие ложного соединения отсутствует или тривиально. Либо: используйте синтаксис ПЕРЕКРЕСТНОГО СОЕДИНЕНИЯ, чтобы разрешить декартовы произведения между этими отношениями, либо: включите неявные декартовы произведения, установив переменную конфигурации spark.sql.crossJoin.enabled=true;

Если ваш ключ объединения ( user здесь ) не является столбцом, который однозначно идентифицирует строки, вы также получите умножение строк, но внутри каждой user группы:

 df = df1.join(df2, on='user')
print("Number of rows : tdf1: {} tdf2: {} tdf: {}".format(df1.count(), df2.count(), df.count()))

        Number of rows :    df1: 10     df2: 10     df: 50

         ---- ------ ------ 
        |user|value1|value2|
         ---- ------ ------ 
        |   1|    59|    81|
        |   1|    59|    69|
        |   1|    59|    61|
        |   1|    59|    76|
        |   1|    59|    21|
        |   1|    71|    81|
        |   1|    71|    69|
        |   1|    71|    61|
        |   1|    71|    76|
        |   1|    71|    21|
        |   1|    61|    81|
        |   1|    61|    69|
        |   1|    61|    61|
        |   1|    61|    76|
        |   1|    61|    21|
        |   1|    22|    81|
        |   1|    22|    69|
        |   1|    22|    61|
        |   1|    22|    76|
        |   1|    22|    21|
         ---- ------ ------ 
 

5 * 5 строк для пользователя 0 5 * 5 строк для пользователя 1 , следовательно, 50

Примечание: использование a self join , за которым следует a filter , обычно означает, что вместо этого вы должны использовать оконные функции.