AWS Glue Context не поддерживает sql-запрос

#python #apache-spark #aws-glue

#python #apache-spark #aws-glue

Вопрос:

Пример использования: передайте запрос с предложением where в AWS glue.

 source_df = glueContext.read.format("jdbc")
  .option("url","jdbc:oracle:thin://@xxxxx:1521/ORCL")
  .option("user","user")
  .option("password","password")
  .option("dbtable","(Select * from test) as test")
  .option("driver","oracle.jdbc.driver.OracleDriver")
  .load()
 

Я получаю следующую ошибку:

 Traceback (most recent call last): File "/tmp/comments_jdbc", line 17, in <module> source_df = spark.read.format("jdbc").option("url","jdbc:oracle:thin://xxxxx:1521/ORCL").option("user","user").option("password","password").option("dbtable","(Select * from test) as test").option("driver","oracle.jdbc.driver.OracleDriver").load()

File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load return self._df(self._jreader.load())

File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name)

File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw)

File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) 

py4j.protocol.Py4JJavaError: An error occurred while calling o77.load. : java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended at 

oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:447) at 

oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396) at 

oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:951) at 

oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:513) at 

oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:227) at 

oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:531) at 

oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:208) at 

oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:886) at 

oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1175) at
 

Ответ №1:

Я не вижу ни одного предложения where в вашем option for dbtable , но я попытался запустить запрос непосредственно в Oracle, и он выдал мне ту же ошибку, что и вы. Похоже, что Oracle не поддерживает псевдонимы таблиц и допускает только столбцы.

Если вы все еще хотите использовать псевдоним, попробуйте выполнить приведенный ниже запрос из задания склеивания или просто выберите запрос без круглых скобок.

 select test.* from (select * from test) test
 

Или просто запустите запрос как есть с некоторым предложением where в нем:

 select * from test where <some_col=value>
 

Ответ №2:

это 5 разных фрагментов кода, которые я пробовал для сравнения производительности, только 2 фактически фильтровали данные на уровне сервера при использовании profiler, на данный момент кажется, что без создания пользовательского соединителя или покупки на marketplace единственный способ заставить это работать — использовать glueContext.читать

Вы можете конвертировать динамические фреймы в фреймы данных и из них (см. Пример)

 rds_dynamicframe = DynamicFrame.fromDF(rds_dataframe, glueContext, "nested")
 

вы также должны проверить это при запуске профилировщика Sql Server со всеми событиями из: OLEDB, хранимых процедур, TSQL и транзакций

 import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.context import DynamicFrame

# list parameter with 2 leading hyphens --param_server_url 
args = getResolvedOptions(sys.argv,['JOB_NAME'])
print("JOB_NAME: ", args['JOB_NAME'])

job_server_url="SERVER URL"
job_db_name="DB NAME"
job_db_user="DB USER"
job_db_password="DB PASSWORD"
job_table_name="TABLE NAME"

job_glue_db_name="GLUE DATA CATALOG DATABASE NAME"
job_glue_conn_name="GLUE DATA CATALOG CONNECTION NAME"
job_glue_table_name="GLUE DATA CATALOG TABLE NAME"

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
region = "us-east-1"

#### aws glue data catalog table info (from ) ####
# Name  job_glue_table_name
# Database  job_glue_db_name
# Classification    sqlserver
# Location  job_db_name.dbo.job_table_name
# Connection    job_glue_conn_name

#### GlueContext Class ####
# https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html

#### DynamicFrame Class ####
# https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html

#### Connection Api ####
# https://docs.aws.amazon.com/glue/latest/webapi/API_Connection.html

#### Using connectors and connections with AWS Glue Studio ####
# Link : https://docs.aws.amazon.com/glue/latest/ug/connectors-chapter.html
# Use AWS Secrets Manager for storing credentials
# Filtering the source data with row predicates and column projections 

#### Connection options for type custom.jdbc or marketplace.jdbc ####
# Link : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-connect.html#aws-glue-programming-etl-connect-jdbc
# className – String, required, driver class name.
# connectionName – String, required, name of the connection that is associated with the connector.
# secretId or user/password – String, required, used to retrieve credentials for the URL.
# dbTable or query – String, required, the table or SQL query to get the data from. You can specify either dbTable or query, but not both.
# filterPredicate – String, optional, extra condition clause to filter data from source. For example:

# using  for new line with more commands
# query="recordid<=5", -- filtering !
print("0001 - df_read_query")
df_read_query = glueContext.read 
    .format("jdbc") 
    .option("url","jdbc:sqlserver://" job_server_url ":1433;databaseName=" job_db_name ";") 
    .option("query","select recordid from " job_table_name " where recordid <= 5") 
    .option("user",job_db_user) 
    .option("password",job_db_password) 
    .load()
print("df_read_query count: ", df_read_query.count())
df_read_query.show(10)
df_read_query.printSchema()

# query="recordid<=5", -- not filtering
print("0002 - df_from_catalog_query")
df_from_catalog_query = glueContext.create_dynamic_frame.from_catalog(
    database = job_glue_db_name, 
    table_name = job_glue_table_name, 
    additional_options={
        "query":"select recordid from " job_table_name " where recordid <= 5;",
    },
    transformation_ctx = "df_from_catalog_query", 
)
print("df_from_catalog_query count: ", df_from_catalog_query.count())
df_from_catalog_query.show(10)

# push_down_predicate="recordid<=5", -- not filtering
print("0003 - df_from_catalog_push_down_predicate")
df_from_catalog_push_down_predicate = glueContext.create_dynamic_frame.from_catalog(
    database = job_glue_db_name, 
    table_name = job_db_name '_dbo_' job_table_name, 
    push_down_predicate = "recordid<=5",
    transformation_ctx = "df_from_catalog_push_down_predicate",
)
print("df_from_catalog_push_down_predicate count: ", df_from_catalog_push_down_predicate.count())
df_from_catalog_push_down_predicate.show(10)

# filterPredicate="recordid<=5", -- not filtering
print("0004 - df_from_options_sqlserver")
df_from_options_sqlserver = glueContext.create_dynamic_frame.from_options(
    connection_type = "sqlserver", 
    connection_options = {
        "url":"jdbc:sqlserver://" job_server_url ":1433;databaseName=" job_db_name ";",
        "username":job_db_user,
        "password":job_db_password,
        "location":job_db_name ".dbo." job_table_name,
        "filterPredicate":"recordid<=5",
    }, 
    transformation_ctx = "df_from_options_sqlserver",
)
print("df_from_options_sqlserver count: ", df_from_options_sqlserver.count())
df_from_options_sqlserver.show(10)

# dbtable="recordid<=5", -- filtering !
print("0005 - df_read_dbtable")
df_read_dbtable = glueContext.read 
    .format("jdbc") 
    .option("url","jdbc:sqlserver://" job_server_url ":1433;databaseName=" job_db_name ";") 
    .option("user",job_db_user) 
    .option("password",job_db_password) 
    .option("dbtable","(select recordid from " job_table_name " where recordid<=5) as t1") 
    .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver") 
    .load()
print("df_read_dbtable count: ", df_read_dbtable.count())
df_read_dbtable.show(10)

job.commit()