PySpark .join() с разными именами столбцов и не может быть жестко запрограммирован перед выполнением

#apache-spark #pyspark #apache-spark-sql

#apache-spark #pyspark #apache-spark-sql

Вопрос:

Я понимаю, что final = ta.join(tb, on=['ID'], how='left') оба левых и правых имеют столбец ‘ID’ с одинаковым именем.

И я получаю это final = ta.join(tb, ta.leftColName == tb.rightColName, how='left') Имена левого и правого столбцов известны до выполнения, поэтому имена столбцов могут быть жестко запрограммированы.

Но что, если имена левого и правого столбцов предиката on различны и вычисляются / выводятся с помощью переменных конфигурации? Например:

1) leftColName = 'leftKey'

2) rightColName = 'rightKey'

3) final = ta.join(tb, ta.leftColname == tb.rightColname, how='left')

Значения leftColName amp; rightColName неизвестны до того, как строка 3 может быть жестко запрограммирована и выполнена.

Это не работает, потому что я нахожу, что среда выполнения может периодически путаться / теряться в том rightColName , относится ли к ta или к tb

final = ta.join(tb, f.col(leftColName) == f.col(rightColName), 'left')

Похоже, у Scala есть средство для включения этого.

Ответ №1:

Вы ссылаетесь на столбец как ta.leftColname , но — аналогично Pandas — вы также можете ссылаться на него ta["leftColname"] .

Таким образом, вместо жестко заданного имени столбца вы также можете использовать переменную. Например:

 left_key = 'leftColname'
right_key = 'rightColname'
final = ta.join(tb, ta[left_key] == tb[right_key], how='left')
 

Комментарии:

1. Клянусь, я пробовал это. Он выдал индекс, который должен быть int, не может быть ошибкой строки. Но теперь, похоже, он работает. По крайней мере, до тех пор, пока это не повторится снова. Спасибо. Что еще отличается? В ожидании вашего ответа кластер получил перезагрузку.

Ответ №2:

Если выполняется для нескольких столбцов, то работает следующее понимание списка

 df1 = df10_fj.select(*(f.col(x).alias(x   "_df10") for x in df10_fj.columns))
df2 = df4_fj.select(*(f.col(x).alias(x   "_df4") for x in df4_fj.columns))

# df1, on=[df1.DOB_df10 == df2.DOB_df4, df1.soundex_lnm_df10 == df2.soundex_lnm_df4]

df = df2.join(df1, on=[df1[f"{c}_df10"] == df2[f"{c}_df4"] for c in join_cols])```