#python #apache-spark #dataframe #pyspark #typeerror
#python #apache-spark #фрейм данных #pyspark #ошибка типа
Вопрос:
я получаю приведенную ниже ошибку при выполнении упомянутого оператора ‘join’. я использую настройку pyspark. Любые изменения, требуемые в инструкции join или коде.
Ошибка типа: объект ‘DataFrame’ нельзя вызвать
df11 = spark.read.option("header","true").option("delimiter", ",").csv("s3://mybucket/file1.csv")
df22 = spark.read.option("header","true").option("delimiter", ",").csv("s3://mybucket/file2.csv")
df11.createOrReplaceTempView("table1")
df22.createOrReplaceTempView("table2")
df1 = spark.sql( "select * from table1" )
df2 = spark.sql( "select * from table2" )
df_d = df1.join(df2, df1.NO == df2.NO, 'left').filter(F.isnull(df2.NO)).select(df1.NO,df1.NAME,df1.LAT,df1.LONG, F.lit('DELETE').alias('FLAG'))
Спасибо
Ответ №1:
используйте имена столбцов в виде строки, подобной этой, это должно сработать
df_d = df1.join(df2, df1['NO'] == df2['NO'], 'left').filter(F.isnull(df2['NO'])).select(df1['NO'],df1['NAME'],df1['LAT'],df1['LONG'], F.lit('DELETE').alias('FLAG'))
Ответ №2:
Это поможет добавить новый столбец из списка существующих столбцов
for col_name in partition_key_list:
print(col_name)
#df_final_recs_i_u_n = df_final_recs_i_u_n.withColumn(f"{col_name}_partition_by", df_final_recs_i_u_n.date_tgt)
df_final_recs_i_u_n=df_final_recs_i_u_n.withColumn(f"{col_name}_partition_by",df_final_recs_i_u_n[f"{col_name}_tgt"])
Ответ №3:
После создания временного представления вы можете использовать Spark SQL для создания окончательного фрейма данных. Пожалуйста, проверьте скриншот ниже:
Соответствующий SQL:
spark.sql("select table1.NO, table1.NAME, table1.LAT, table1.LONG, 'DELETE' as FLAG from table1 left join table2 on table1.NO = table2.NO where table2.NO is null").show()