Базы данных com.microsoft.sqlserver.jdbc.Исключение SQLServerException: Неправильный синтаксис

#python #apache-spark #pyspark #databricks

Вопрос:

Когда я пытаюсь выполнить следующую функцию, я получаю ошибку в Apache Spark / Databricks:

 com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near amp;#39;)amp;#39;.
 

Код таков:

 Ancestor = ancestor.getAncestorPath(conn,log,processId, entityStageId)
dataPath = Ancestor["ancestorPath"]

#Unpack the config specifications
import json
if len(Ancestor["dfConfig"]) > 0:
  dfConfig = json.loads(Ancestor["dfConfig"])

#Display the Ancestor path for debugging
print(f"Ancestor:{Ancestor}")
 

Похоже, проблема в подключении к SQLDB.

Фактическая функция заключается в следующем:

 def getAncestorPath(connectionInst: conn.connect, log: logs.Logging,  processId, entityStageId = "-1"):
  log.writeToLogs(processId,logs.LogType.Info, logs.EventType.GetAncestorPath, logs.LogMessage.GetAncestorPath)
  pathQuery = f"SELECT * FROM Config.GetAncestorSlicePath WHERE (ProcessID = {processId}) OR (ProcessID IS NULL AND EntityStageID = {entityStageId})"
  
  AncestorPath = ""
  dfConfig = ""
  
  entityPath = connectionInst.readFromDb(processId,pathQuery)
  
  ancestorPathToList = entityPath.limit(1).rdd.collect()
  
  if len(ancestorPathToList) is 0:
    log.writeToLogs(processId,logs.LogType.Error, logs.EventType.NoAncestorPath, logs.LogMessage.NoAncestorPath, errorType = logs.ErrorType.NoAncestorPath)
    log.writeToLogs(processId,logs.LogType.Error, logs.EventType.FailGetAncestorPath, logs.LogMessage.FailGetAncestorPath, errorType = logs.ErrorType.FailGetAncestorPath)
    raise ValueError(logs.LogMessage.NoAncestorPath.value)
  
  for row in ancestorPathToList:
    AncestorPath = f"/mnt/lake/{row.LakePath}"
    dfConfig = row.Config
  
  log.writeToLogs(processId,logs.LogType.Info, logs.EventType.SuccessGetAncestorPath, logs.LogMessage.SuccessGetAncestorPath)
  return {"ancestorPath":AncestorPath,"dfConfig":dfConfig}
 

Трассировка ошибок выглядит следующим образом:

 Py4JJavaError                             Traceback (most recent call last)
/databricks/python/lib/python3.8/site-packages/hydr8/utils/connection.py in readFromDb(self, processId, query)
     22     try:
---> 23         jdbcDF = (self.connectionProperties.spark.read
     24         .format("jdbc")

/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    209         else:
--> 210             return self._df(self._jreader.load())
    211 

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    109         try:
--> 110             return f(*a, **kw)
    111         except py4j.protocol.Py4JJavaError as e:

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.n".
 

Как уже упоминалось, проблема возникает при попытке подключения к базе данных SQL. Функция, используемая для подключения к базе данных, выглядит следующим образом:

подключение для чтения jdbc к базе данных SQL

 class connect:  
  def __init__(self, conprops: conn.ConnectionProperties, logs):
        self.connectionProperties = conprops
        self.log = logs

  def readFromDb(self,processId, query):
    try:
        jdbcDF = (self.connectionProperties.spark.read
        .format("jdbc")
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
        .option("url", f"jdbc:sqlserver://{self.connectionProperties.dbConnectionProperties.DBServer}.database.windows.net;database={self.connectionProperties.dbConnectionProperties.DBDatabase}")
        .option("user", self.connectionProperties.dbConnectionProperties.DBUser)
        .option("query", query)
        .option("password", self.connectionProperties.dbConnectionProperties.DBPword)
        .load()    
          )
        return jdbcDF
    except Exception as e:
        self.log.writeToLogs(self,processId,logging.LogType.Error, logging.EventType.FailReadFromDb, logging.LogMessage.FailReadFromDb, errorType = logging.ErrorType.FailReadFromDb)
        raise Exception(f"{logging.LogMessage.FailReadFromDb.value} ERROR: {e}")
    except:
        self.log.writeToLogs(self,processId,logging.LogType.Error, logging.EventType.FailReadFromDb, logging.LogMessage.FailReadFromDb, errorType = logging.ErrorType.FailReadFromDb)
        raise Exception(f"{logging.LogMessage.FailReadFromDb.value}")
 

Любые мысли очень приветствуются..

Комментарии:

1. что это за тип ProcessID и EntityStageID ?

2. Идентификатор процесса и EntityStageID являются целыми числами. Ты это имеешь в виду?

3. Я могу ошибаться, но проблема, похоже, в коде jdbcDF = (connectionProperties.spark.read

4. Привет @AlexOtt, я добавил полный маршрут стека сюда: gist.github.com/cpatte7372/f9a820e82c5e57befa919430b1b9af45

5. У меня такое чувство, что это что-то вопиюще простое в том, как я пытаюсь подключиться к SQLDB, но я не могу понять, в чем дело