#apache-spark #pyspark #pyspark-sql
#apache-spark #pyspark #pyspark-sql
Вопрос:
Перекрестное соединение может быть выполнено следующим образом:
df1 = pd.DataFrame({'subgroup':['A','B','C','D']})
df2 = pd.DataFrame({'dates':pd.date_range(date_today, date_today timedelta(3), freq='D')})
sdf1 = spark.createDataFrame(df1)
sdf2 = spark.createDataFrame(df2)
sdf1.crossJoin(sdf2).toPandas()
В этом примере есть два фрейма данных, каждый из которых содержит 4 строки, в итоге я получаю 16 строк.
Однако для моей проблемы я хотел бы выполнить перекрестное соединение для каждого пользователя, а пользователь — это еще один столбец в двух фреймах данных, например:
df1 = pd.DataFrame({'user':[1,1,1,1,2,2,2,2],'subgroup':['A','B','C','D','A','B','D','E']})
df2 = pd.DataFrame({'user':[1,1,1,1,2,2,2,2],'dates':np.hstack([np.array(pd.date_range(date_today, date_today timedelta(3), freq='D')),np.array(pd.date_range(date_today timedelta(1), date_today timedelta(4), freq='D'))])})
Результатом применения перекрестного соединения для каждого пользователя должен быть фрейм данных с 32 строками. Возможно ли это в pyspark и как это можно сделать?
Комментарии:
1. Отфильтруйте каждый фрейм данных на два, по одному для каждого пользователя, перекрестно соедините соответствующие вложенные фреймы данных (чтобы получить 2 фрейма данных, каждый из которых содержит 16 строк), затем объедините два перекрестных фрейма данных
2. @sramalingam24, я должен был упомянуть, что это должно быть обобщено для n пользователей (т. Е. Более 2)
3. Затем вам нужно будет написать функцию, которая выполняет это по диапазону идентификаторов пользователей, предпочтительно в режиме map (filter amp; join) — reduce (объединение)
4. Выполнение этого для диапазона идентификаторов пользователей действительно было бы решением, если бы это можно было сделать способом сокращения карты. Как этого можно достичь?
5. Я бы очень помог, если бы вы могли свести свою проблему к минимальному экземпляру и предоставить образцы данных и образцы выходных данных, чтобы мы могли точно знать, чего вы хотите достичь.
Ответ №1:
Перекрестное соединение — это соединение, которое генерирует умножение строк, потому что ключ объединения не идентифицирует строки однозначно (в нашем случае ключ объединения тривиален или ключа объединения вообще нет)
Давайте начнем с примеров фреймов данных:
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
df1 = spark.createDataFrame(
[[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())],
schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value1']]))
df2 = spark.createDataFrame(
[[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())],
schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value2']]))
---- ------
|user|value1|
---- ------
| 0| 76|
| 1| 59|
| 0| 14|
| 1| 71|
| 0| 66|
| 1| 61|
| 0| 2|
| 1| 22|
| 0| 16|
| 1| 83|
---- ------
---- ------
|user|value2|
---- ------
| 0| 65|
| 1| 81|
| 0| 60|
| 1| 69|
| 0| 21|
| 1| 61|
| 0| 98|
| 1| 76|
| 0| 40|
| 1| 21|
---- ------
Давайте попробуем соединить фреймы данных в постоянном столбце, чтобы увидеть эквивалентность между перекрестным соединением и обычным соединением в постоянном (тривиальном) столбце:
df = df1.withColumn('key', psf.lit(1))
.join(df2.withColumn('key', psf.lit(1)), on=['key'])
Мы получаем ошибку от spark> 2, потому что он понимает, что мы пытаемся выполнить перекрестное соединение (декартово произведение)
Py4JJavaError: произошла ошибка при вызове o1865.showString. : org.apache.spark.sql.AnalysisException: обнаружено неявное декартово произведение для ВНУТРЕННЕГО соединения между логическими планами LogicalRDD [user#1538, value1# 1539], false и LogicalRDD [user#1542, value2 # 1543], условие ложного соединения отсутствует или тривиально. Либо: используйте синтаксис ПЕРЕКРЕСТНОГО СОЕДИНЕНИЯ, чтобы разрешить декартовы произведения между этими отношениями, либо: включите неявные декартовы произведения, установив переменную конфигурации spark.sql.crossJoin.enabled=true;
Если ваш ключ объединения ( user
здесь ) не является столбцом, который однозначно идентифицирует строки, вы также получите умножение строк, но внутри каждой user
группы:
df = df1.join(df2, on='user')
print("Number of rows : tdf1: {} tdf2: {} tdf: {}".format(df1.count(), df2.count(), df.count()))
Number of rows : df1: 10 df2: 10 df: 50
---- ------ ------
|user|value1|value2|
---- ------ ------
| 1| 59| 81|
| 1| 59| 69|
| 1| 59| 61|
| 1| 59| 76|
| 1| 59| 21|
| 1| 71| 81|
| 1| 71| 69|
| 1| 71| 61|
| 1| 71| 76|
| 1| 71| 21|
| 1| 61| 81|
| 1| 61| 69|
| 1| 61| 61|
| 1| 61| 76|
| 1| 61| 21|
| 1| 22| 81|
| 1| 22| 69|
| 1| 22| 61|
| 1| 22| 76|
| 1| 22| 21|
---- ------ ------
5 * 5 строк для пользователя 0
5 * 5 строк для пользователя 1
, следовательно, 50
Примечание: использование a self join
, за которым следует a filter
, обычно означает, что вместо этого вы должны использовать оконные функции.