(pyspark) как создать фреймы данных, которые не имеют одинакового идентификатора пользователя взаимно

#dataframe #apache-spark #join #pyspark

#фрейм данных #apache-spark #Присоединиться #pyspark

Вопрос:

Я пытался собрать 2 user_id dataframes в pyspark те, которые не имеют одинакового идентификатора пользователя взаимно.

введите описание изображения здесь

Итак, я ввел несколько кодов, которые вы можете увидеть ниже

 import  pyspark.sql.functions as f

query = "select * from tb_original"
df_original = spark.sql(query)
df_original = df_original.select("user_id").distinct()

df_a = df_original.sort(f.rand()).limit(10000)
df_a.count()
# df_a: 10000

df_b = df_original.join(df_a,on="user_id",how="left_anti").sort(f.rand()).limit(10000)

df_b.count()
# df_b: 10000

df_a.join(df_b,on="user_id",how="left_anti").count()
# df_a - df_b = 9998
# What?????
 

В результате df_a and df_b у вас одинаковые 2 идентификатора пользователя … иногда 1 или 0.

Похоже, проблем с кодами нет. Однако, возможно, это происходит из-за ленивого действия механизма spark…

Мне нужно решить эту проблему для сбора 2 user_id dataframes данных, которые не имеют одинакового идентификатора пользователя взаимно.

Комментарии:

1. df_original.join(df_a,on="user_id",how="left_anti") не должно иметь результатов, потому что все идентификаторы пользователей в df_a должны присутствовать в df_original. Я не знаю, почему вы получили 10000. Если вы хотите собрать два фрейма данных, используйте df_a.union(df_b).distinct()

2. @mck 10000 — это просто пример. Мне нужно 2 фрейма данных, потому что они используются разными группами пользователей, такими как A / B-тест, и users in df_a amp; df_b они не должны дублироваться. Поэтому ваш совет не подходит для моего случая.

3. @rupert если ваше распределение похоже на 60: 40, тогда используйте номер группы равным 5 и отфильтруйте 0,1,2 в df_a и оставьте 3,4 в df_b. Мой ответ предполагает, что вы пытаетесь разделить поровну в двух столбцах

Ответ №1:

Поскольку вы хотите сгенерировать два разных набора пользователей из заданного пула пользователей без перекрытия, вы можете использовать этот простой трюк: =

 from pyspark.sql.functions import monotonically_increasing_id
import  pyspark.sql.functions as f

#"Creation of Original DF"
query = "select * from tb_original"
df_original = spark.sql(query)
df_original = df_original.select("user_id").distinct()

df_original =df.withColumn("UNIQUE_ID", monotonically_increasing_id())
number_groups_needed=2  ## you can adjust the number of group you need for your use case
dfa=df_original.filter(df_original.UNIQUE_ID % number_groups_needed ==0) 
dfb=df_original.filter(df_original.UNIQUE_ID % number_groups_needed ==1)

##dfa and dfb will not have any overlap for user_id 
 

Ps- если ваш идентификатор пользователя сам по себе является целым числом, вам не нужно создавать новый столбец UNIQUE_ID, вы можете использовать его напрямую.

Ответ №2:

Я выбираю randomSplit функцию, поддерживаемую pyspark.

 df_a,df_b = df_original.randomSplit([0.6,0.4])

df_a = df_a.limit(10000)
df_a.count()
# 10000

df_b = df_b.limit(10000)
df_b.count()
# 10000

df_a.join(df_b,on="user_id",how="left_anti").count()
# 10000
 

никогда больше не конфликтуйте между df_a и df_b!