#apache-spark #apache-spark-sql #aws-glue
Вопрос:
Следующий сценарий заполняет целевую таблицу данными, полученными из исходной таблицы, с использованием pyspark.sql
и без проблем запускается в AWS Glue:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import * from awsglue.dynamicframe import DynamicFrame ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) users = glueContext.create_dynamic_frame.from_catalog( database="source", table_name="source_users" ) users.toDF().createOrReplaceTempView("users") query_users = """ SELECT U.id , signup_from FROM users AS U """ users_df = spark.sql(query_users) users_dynamicframe = DynamicFrame.fromDF( users_df.repartition(1), glueContext, "users_dynamicframe" ) users_output = glueContext.write_dynamic_frame.from_catalog( frame=users_dynamicframe, database="target", table_name="target_users", transformation_ctx="users_output", ) job.commit()
Теперь я хотел бы выполнить INSERT INTO SELECT ... ON DUPLICATE KEY UPDATE ...
, и я написал следующий сценарий:
source_users = glueContext.create_dynamic_frame.from_catalog( database="source", table_name="source_users" ) target_users = glueContext.create_dynamic_frame.from_catalog( database = "target", table_name = "target_users" ) source_users.toDF().createOrReplaceTempView("source_users") target_users.toDF().createOrReplaceTempView("target_users") query = """ INSERT INTO target_users SELECT U.id , U.user_type FROM source_users on duplicate key update id=target_users.id """ target_output = spark.sql(query) job.commit()
что возвращает следующее
ParseException: "nmismatched input 'on' expecting lt;EOFgt;
Я не уверен, как этого добиться, и причина, по которой я пытаюсь это сделать, заключается в том, чтобы отразить в целевой таблице обновления, происходящие в исходной таблице. Любая помощь в этом направлении была бы очень признательна, спасибо!
Комментарии:
1. spark sql не поддерживает обновление.
2. Спасибо @Lamanus, так как же я могу запустить задание клея, гарантируя, что обновления в исходной таблице будут отражены в целевой?
3. загрузите как таблицу, так и соединение, перезапишите.