#dataframe #apache-spark #join #pyspark
#фрейм данных #apache-spark #Присоединиться #pyspark
Вопрос:
Я пытался собрать 2 user_id dataframes
в pyspark те, которые не имеют одинакового идентификатора пользователя взаимно.
Итак, я ввел несколько кодов, которые вы можете увидеть ниже
import pyspark.sql.functions as f
query = "select * from tb_original"
df_original = spark.sql(query)
df_original = df_original.select("user_id").distinct()
df_a = df_original.sort(f.rand()).limit(10000)
df_a.count()
# df_a: 10000
df_b = df_original.join(df_a,on="user_id",how="left_anti").sort(f.rand()).limit(10000)
df_b.count()
# df_b: 10000
df_a.join(df_b,on="user_id",how="left_anti").count()
# df_a - df_b = 9998
# What?????
В результате df_a and df_b
у вас одинаковые 2 идентификатора пользователя … иногда 1 или 0.
Похоже, проблем с кодами нет. Однако, возможно, это происходит из-за ленивого действия механизма spark…
Мне нужно решить эту проблему для сбора 2 user_id dataframes
данных, которые не имеют одинакового идентификатора пользователя взаимно.
Комментарии:
1.
df_original.join(df_a,on="user_id",how="left_anti")
не должно иметь результатов, потому что все идентификаторы пользователей в df_a должны присутствовать в df_original. Я не знаю, почему вы получили 10000. Если вы хотите собрать два фрейма данных, используйтеdf_a.union(df_b).distinct()
2. @mck 10000 — это просто пример. Мне нужно 2 фрейма данных, потому что они используются разными группами пользователей, такими как A / B-тест, и
users in df_a amp; df_b
они не должны дублироваться. Поэтому ваш совет не подходит для моего случая.3. @rupert если ваше распределение похоже на 60: 40, тогда используйте номер группы равным 5 и отфильтруйте 0,1,2 в df_a и оставьте 3,4 в df_b. Мой ответ предполагает, что вы пытаетесь разделить поровну в двух столбцах
Ответ №1:
Поскольку вы хотите сгенерировать два разных набора пользователей из заданного пула пользователей без перекрытия, вы можете использовать этот простой трюк: =
from pyspark.sql.functions import monotonically_increasing_id
import pyspark.sql.functions as f
#"Creation of Original DF"
query = "select * from tb_original"
df_original = spark.sql(query)
df_original = df_original.select("user_id").distinct()
df_original =df.withColumn("UNIQUE_ID", monotonically_increasing_id())
number_groups_needed=2 ## you can adjust the number of group you need for your use case
dfa=df_original.filter(df_original.UNIQUE_ID % number_groups_needed ==0)
dfb=df_original.filter(df_original.UNIQUE_ID % number_groups_needed ==1)
##dfa and dfb will not have any overlap for user_id
Ps- если ваш идентификатор пользователя сам по себе является целым числом, вам не нужно создавать новый столбец UNIQUE_ID, вы можете использовать его напрямую.
Ответ №2:
Я выбираю randomSplit
функцию, поддерживаемую pyspark.
df_a,df_b = df_original.randomSplit([0.6,0.4])
df_a = df_a.limit(10000)
df_a.count()
# 10000
df_b = df_b.limit(10000)
df_b.count()
# 10000
df_a.join(df_b,on="user_id",how="left_anti").count()
# 10000
никогда больше не конфликтуйте между df_a и df_b!