#apache-spark #pyspark #apache-spark-sql
#apache-spark #pyspark #apache-spark-sql
Вопрос:
У меня есть список имен столбцов, который меняется каждый раз. Имена столбцов хранятся в виде списка. Итак, мне нужно передать имена столбцов из списка (в приведенном ниже примере его id и programid) для сравнения исходного и целевого фрейма данных. В приведенном ниже примере я хочу проверить, есть ли src_id == id
и src_programid == programid
.
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import col, when
srccolumns = ['src_id','src_programid']
tgtcolumns = ['id','programid']
joinSrcTgtAction = joinSrcTgt.withColumn(
'action',
when(
(
(col(src_id) == col(id)) amp;
(col(src_programid) == col(programid)) amp;
(joinSrcTgt.src_checksum != joinSrcTgt.checksum)
),
'upsert'
)
)
Ответ №1:
Предполагая, что списки имеют одинаковый размер и гарантируют порядок имен столбцов, чтобы мы могли сопоставить пару по индексу, вы могли бы сделать что-то вроде следующего:
src_columns = ['src_id','src_programid']
tgt_columns = ['id','programid']
condition = True
for i in range(0, len(src_columns)):
condition amp;= (join_src_tgt[src_columns[i]] == join_src_tgt[dest_columns[i]])
join_src_tgt_action = join_src_tgt.withColumn(
'action',
when(condition amp; (join_src_tgt.src_checksum != join_src_tgt.checksum), 'upsert')
)
Комментарии:
1. @Shivhar если это сработало, убедитесь, что вы приняли ответ (маленькая галочка слева от ответа) 🙂
2. Ну, может быть, вы хотите подробно описать, что не работает? вы получаете какую-либо ошибку?