#python #apache-spark #pyspark #apache-spark-sql
#питон #apache-искра #pyspark #apache-spark-sql
Вопрос:
У меня есть фрейм данных, в котором есть столбец ID и связанный столбец массива, который содержит идентификаторы связанных с ним записей.
ID | NAME | RELATED_IDLIST
--------------------------
123 | mike | [345,456]
345 | alen | [789]
456 | sam | [789,999]
789 | marc | [111]
555 | dan | [333]
Исходя из вышесказанного, мне нужно построить связь между всеми связанными дочерними идентификаторами вместе и его родительским идентификатором. Результирующий DF должен иметь вид
ID | NAME | RELATED_IDLIST
--------------------------
123 | mike | [345,456,789,999,111]
345 | alen | [789,111]
456 | sam | [789,999,111]
789 | marc | [111]
555 | dan | [333]
Мне нужна помощь, чтобы разобраться в вышесказанном.
Комментарии:
1. В чем была проблема, когда вы попытались это сделать?
2. Хелоо, я не уверен, с чего начать. например, как получить запись для каждого .. поскольку это связано с циклической зависимостью
3. «не уверен» в общих чертах означает «ленивый». Прямо сейчас ваш вопрос звучит как «код по требованию, за который я не плачу, не считая повторного голосования. Итак, начните с чего-нибудь и посмотрите, что произойдет. Многие хотят помочь вам в получении знаний, но только если вы приложите усилия. Таким образом, купите книгу от VanderPlas, чтобы начать работу с Python Data Science и вернуться к нам с написанным вами кодом, который может вызвать у вас кучу трассировок 😉
4. повторное открытие вопроса, поскольку это интересный вопрос, я хочу увидеть решение, как оно будет реализовано в
spark
Ответ №1:
Используя Self joins
и Window functions
, вы можете решить эту проблему. Я разделил код на 5 шагов. Алгоритм выглядит следующим образом :
- Разнесите список массивов, чтобы создать единственную запись (в данных больше нет массивов)
- Идентификатор самосоединения и связанные (переименованный столбец RELATED_IDLIST) столбцы
- Сведите записи, которые имеют одинаковый a_id, в один массив, а b_id — в другой массив
- Объедините два столбца списка массива в один объединенный массив и ранжируйте результирующие записи на основе наибольшего размера каждого результирующего объединенного массива
- выберите записи, имеющие ранг 1
вы можете попробовать следующий код:
# importing necessary functions for later use
from pyspark.sql.functions import explode,col,collect_set,array_union,size
from pyspark.sql.functions import dense_rank,desc
# need set cross join to True if spark version < 3
spark.conf.set("spark.sql.crossJoin.enabled", True)
############### STEP 0 #####################################
# creating the above mentioned dataframe
id_cols = [123,345,456,789,555]
name_cols = ['mike','alen','sam','marc','dan']
related_idlist_cols = [[345,456],[789],[789,999],[111],[333]]
list_of_rows = [(each_0,each_1,each_2) for each_0, each_1, each_2 in zip(id_cols,name_cols,related_idlist_cols)]
cols_name = ['ID','NAME','RELATED_IDLIST']
# this will result in above mentioned dataframe
df = spark.createDataFrame(list_of_rows,cols_name)
############### STEP 1: Explode values #####################################
# explode function converts arraylist to atomic records
# one record having array size two will result in two records
# -> 123, mike,129
# 123, mike , explode(['129'.'9029']) -->
# -> 123, mike,9029
df_1 = df.select(col('id'),col('name'),explode(df.RELATED_IDLIST).alias('related'))
############### STEP 2 : Self Join with Data #####################################
# creating dataframes with different column names, for joining them later
a = df_1.withColumnRenamed('id','a_id').withColumnRenamed('name','a_name').withColumnRenamed('related','a_related')
b = df_1.withColumnRenamed('id','b_id').withColumnRenamed('name','b_name').withColumnRenamed('related','b_related')
# this is an example outer join amp; self join
df_2 = a.join(b, a.a_related == b.b_id, how='left').orderBy(a.a_id)
############### STEP 3 : create Array Lists #####################################
# using collect_set we can reduce values of a particular kind into one set (we are reducing 'related' records, based on 'id')
df_3 = df_2.select('a_id','a_name',collect_set('a_related').over(Window.partitionBy(df_2.a_id)).
alias('a_related_ids'),collect_set('b_related').over(Window.partitionBy(df_2.b_id)).alias('b_related_ids'))
# merging two sets into one column and also calculating resultant the array size
df_4 = df_3.select('a_id','a_name',array_union('a_related_ids','b_related_ids').alias('combined_ids')).withColumn('size',size('combined_ids'))
# ranking the records to pick the ideal records
df_5 = df_4.select('a_id','a_name','combined_ids',dense_rank().over(Window.partitionBy('a_id').orderBy(desc('size'))).alias('rank'))
############### STEP 4 : Selecting Ideal Records #####################################
# picking records of rank 1, but this will have still ducplicates so removing them using distinct and ordering them by id
df_6 = df_5.select('a_id','a_name','combined_ids').filter(df_5.rank == 1).distinct().orderBy('a_id')
############### STEP 5 #####################################
display(df_6)