Параметризует условие соединения в pyspark

#apache-spark #pyspark #apache-spark-sql

#apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть список имен столбцов, который меняется каждый раз. Имена столбцов хранятся в виде списка. Итак, мне нужно передать имена столбцов из списка (в приведенном ниже примере его id и programid) для сравнения исходного и целевого фрейма данных. В приведенном ниже примере я хочу проверить, есть ли src_id == id и src_programid == programid .

 from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import col, when

srccolumns = ['src_id','src_programid']
tgtcolumns = ['id','programid']

joinSrcTgtAction =  joinSrcTgt.withColumn(
    'action', 
    when(
        (
            (col(src_id) == col(id)) amp; 
            (col(src_programid) == col(programid)) amp; 
            (joinSrcTgt.src_checksum != joinSrcTgt.checksum)
        ),
        'upsert'
    )
)
  

Ответ №1:

Предполагая, что списки имеют одинаковый размер и гарантируют порядок имен столбцов, чтобы мы могли сопоставить пару по индексу, вы могли бы сделать что-то вроде следующего:

 src_columns = ['src_id','src_programid']
tgt_columns = ['id','programid']

condition = True
for i in range(0, len(src_columns)):
    condition amp;= (join_src_tgt[src_columns[i]] == join_src_tgt[dest_columns[i]])

join_src_tgt_action =  join_src_tgt.withColumn(
    'action', 
    when(condition amp; (join_src_tgt.src_checksum != join_src_tgt.checksum), 'upsert')
)
  

Комментарии:

1. @Shivhar если это сработало, убедитесь, что вы приняли ответ (маленькая галочка слева от ответа) 🙂

2. Ну, может быть, вы хотите подробно описать, что не работает? вы получаете какую-либо ошибку?