Выполните ВСТАВКУ В … ВЫБЕРИТЕ в AWS GLUE

#apache-spark #apache-spark-sql #aws-glue

Вопрос:

Следующий сценарий заполняет целевую таблицу данными, полученными из исходной таблицы, с использованием pyspark.sql и без проблем запускается в AWS Glue:

 import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import * from awsglue.dynamicframe import DynamicFrame  ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ["JOB_NAME"])  sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args)  users = glueContext.create_dynamic_frame.from_catalog(  database="source", table_name="source_users" )  users.toDF().createOrReplaceTempView("users") query_users = """ SELECT U.id  , signup_from FROM users AS U """  users_df = spark.sql(query_users) users_dynamicframe = DynamicFrame.fromDF(  users_df.repartition(1), glueContext, "users_dynamicframe" ) users_output = glueContext.write_dynamic_frame.from_catalog(  frame=users_dynamicframe,  database="target",  table_name="target_users",  transformation_ctx="users_output", )  job.commit()  

Теперь я хотел бы выполнить INSERT INTO SELECT ... ON DUPLICATE KEY UPDATE ... , и я написал следующий сценарий:

 source_users = glueContext.create_dynamic_frame.from_catalog(  database="source", table_name="source_users" )  target_users = glueContext.create_dynamic_frame.from_catalog(  database = "target", table_name = "target_users" )  source_users.toDF().createOrReplaceTempView("source_users") target_users.toDF().createOrReplaceTempView("target_users")   query = """ INSERT INTO target_users SELECT U.id  , U.user_type FROM source_users  on duplicate key update id=target_users.id """  target_output = spark.sql(query) job.commit()  

что возвращает следующее

ParseException: "nmismatched input 'on' expecting lt;EOFgt;

Я не уверен, как этого добиться, и причина, по которой я пытаюсь это сделать, заключается в том, чтобы отразить в целевой таблице обновления, происходящие в исходной таблице. Любая помощь в этом направлении была бы очень признательна, спасибо!

Комментарии:

1. spark sql не поддерживает обновление.

2. Спасибо @Lamanus, так как же я могу запустить задание клея, гарантируя, что обновления в исходной таблице будут отражены в целевой?

3. загрузите как таблицу, так и соединение, перезапишите.