PySpark получает связанную строку из значения объекта списка DF

#python #apache-spark #pyspark #apache-spark-sql

#питон #apache-искра #pyspark #apache-spark-sql

Вопрос:

У меня есть фрейм данных, в котором есть столбец ID и связанный столбец массива, который содержит идентификаторы связанных с ним записей.

 ID | NAME | RELATED_IDLIST
--------------------------
123 | mike | [345,456]
345 | alen | [789]
456 | sam  | [789,999]
789 | marc | [111]
555 | dan  | [333]
 

Исходя из вышесказанного, мне нужно построить связь между всеми связанными дочерними идентификаторами вместе и его родительским идентификатором. Результирующий DF должен иметь вид

 ID | NAME | RELATED_IDLIST
 --------------------------
 123 | mike | [345,456,789,999,111]
 345 | alen | [789,111]
 456 | sam  | [789,999,111]
 789 | marc | [111]
 555 | dan  | [333]
 
 

Мне нужна помощь, чтобы разобраться в вышесказанном.

Комментарии:

1. В чем была проблема, когда вы попытались это сделать?

2. Хелоо, я не уверен, с чего начать. например, как получить запись для каждого .. поскольку это связано с циклической зависимостью

3. «не уверен» в общих чертах означает «ленивый». Прямо сейчас ваш вопрос звучит как «код по требованию, за который я не плачу, не считая повторного голосования. Итак, начните с чего-нибудь и посмотрите, что произойдет. Многие хотят помочь вам в получении знаний, но только если вы приложите усилия. Таким образом, купите книгу от VanderPlas, чтобы начать работу с Python Data Science и вернуться к нам с написанным вами кодом, который может вызвать у вас кучу трассировок 😉

4. повторное открытие вопроса, поскольку это интересный вопрос, я хочу увидеть решение, как оно будет реализовано в spark

Ответ №1:

Используя Self joins и Window functions , вы можете решить эту проблему. Я разделил код на 5 шагов. Алгоритм выглядит следующим образом :

  1. Разнесите список массивов, чтобы создать единственную запись (в данных больше нет массивов)
  2. Идентификатор самосоединения и связанные (переименованный столбец RELATED_IDLIST) столбцы
  3. Сведите записи, которые имеют одинаковый a_id, в один массив, а b_id — в другой массив
  4. Объедините два столбца списка массива в один объединенный массив и ранжируйте результирующие записи на основе наибольшего размера каждого результирующего объединенного массива
  5. выберите записи, имеющие ранг 1

вы можете попробовать следующий код:

 # importing necessary functions for later use
from pyspark.sql.functions import explode,col,collect_set,array_union,size
from pyspark.sql.functions import dense_rank,desc

# need set cross join to True if spark version < 3
spark.conf.set("spark.sql.crossJoin.enabled", True)

############### STEP 0 #####################################
# creating the above mentioned dataframe
id_cols = [123,345,456,789,555]
name_cols = ['mike','alen','sam','marc','dan']
related_idlist_cols = [[345,456],[789],[789,999],[111],[333]]

list_of_rows = [(each_0,each_1,each_2) for each_0, each_1, each_2 in zip(id_cols,name_cols,related_idlist_cols)]
cols_name = ['ID','NAME','RELATED_IDLIST']

# this will result in above mentioned dataframe
df = spark.createDataFrame(list_of_rows,cols_name)

############### STEP 1: Explode values  #####################################
# explode function converts arraylist to atomic records
# one record having array size two will result in two records 
#                                         -> 123, mike,129
# 123, mike , explode(['129'.'9029']) -->
#                                         -> 123, mike,9029
df_1 = df.select(col('id'),col('name'),explode(df.RELATED_IDLIST).alias('related'))

############### STEP 2 : Self Join with Data  #####################################
# creating dataframes with different column names, for joining them later
a = df_1.withColumnRenamed('id','a_id').withColumnRenamed('name','a_name').withColumnRenamed('related','a_related')
b = df_1.withColumnRenamed('id','b_id').withColumnRenamed('name','b_name').withColumnRenamed('related','b_related')

# this is an example outer join amp; self join 
df_2 = a.join(b, a.a_related == b.b_id, how='left').orderBy(a.a_id)

############### STEP 3 : create Array Lists #####################################
# using collect_set we can reduce values of a particular kind into one set (we are reducing 'related' records, based on 'id')
df_3  = df_2.select('a_id','a_name',collect_set('a_related').over(Window.partitionBy(df_2.a_id)).
                    alias('a_related_ids'),collect_set('b_related').over(Window.partitionBy(df_2.b_id)).alias('b_related_ids'))

# merging two sets into one column and also calculating resultant the array size
df_4 = df_3.select('a_id','a_name',array_union('a_related_ids','b_related_ids').alias('combined_ids')).withColumn('size',size('combined_ids'))

# ranking the records to pick the ideal records 
df_5 = df_4.select('a_id','a_name','combined_ids',dense_rank().over(Window.partitionBy('a_id').orderBy(desc('size'))).alias('rank'))

############### STEP 4 : Selecting Ideal Records  #####################################
# picking records of rank 1, but this will have still ducplicates so removing them using distinct and ordering them by id
df_6 = df_5.select('a_id','a_name','combined_ids').filter(df_5.rank == 1).distinct().orderBy('a_id')

############### STEP 5 #####################################
display(df_6)