#python #apache-spark #pyspark #databricks
Вопрос:
Когда я пытаюсь выполнить следующую функцию, я получаю ошибку в Apache Spark / Databricks:
com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near amp;#39;)amp;#39;.
Код таков:
Ancestor = ancestor.getAncestorPath(conn,log,processId, entityStageId)
dataPath = Ancestor["ancestorPath"]
#Unpack the config specifications
import json
if len(Ancestor["dfConfig"]) > 0:
dfConfig = json.loads(Ancestor["dfConfig"])
#Display the Ancestor path for debugging
print(f"Ancestor:{Ancestor}")
Похоже, проблема в подключении к SQLDB.
Фактическая функция заключается в следующем:
def getAncestorPath(connectionInst: conn.connect, log: logs.Logging, processId, entityStageId = "-1"):
log.writeToLogs(processId,logs.LogType.Info, logs.EventType.GetAncestorPath, logs.LogMessage.GetAncestorPath)
pathQuery = f"SELECT * FROM Config.GetAncestorSlicePath WHERE (ProcessID = {processId}) OR (ProcessID IS NULL AND EntityStageID = {entityStageId})"
AncestorPath = ""
dfConfig = ""
entityPath = connectionInst.readFromDb(processId,pathQuery)
ancestorPathToList = entityPath.limit(1).rdd.collect()
if len(ancestorPathToList) is 0:
log.writeToLogs(processId,logs.LogType.Error, logs.EventType.NoAncestorPath, logs.LogMessage.NoAncestorPath, errorType = logs.ErrorType.NoAncestorPath)
log.writeToLogs(processId,logs.LogType.Error, logs.EventType.FailGetAncestorPath, logs.LogMessage.FailGetAncestorPath, errorType = logs.ErrorType.FailGetAncestorPath)
raise ValueError(logs.LogMessage.NoAncestorPath.value)
for row in ancestorPathToList:
AncestorPath = f"/mnt/lake/{row.LakePath}"
dfConfig = row.Config
log.writeToLogs(processId,logs.LogType.Info, logs.EventType.SuccessGetAncestorPath, logs.LogMessage.SuccessGetAncestorPath)
return {"ancestorPath":AncestorPath,"dfConfig":dfConfig}
Трассировка ошибок выглядит следующим образом:
Py4JJavaError Traceback (most recent call last)
/databricks/python/lib/python3.8/site-packages/hydr8/utils/connection.py in readFromDb(self, processId, query)
22 try:
---> 23 jdbcDF = (self.connectionProperties.spark.read
24 .format("jdbc")
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
209 else:
--> 210 return self._df(self._jreader.load())
211
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
109 try:
--> 110 return f(*a, **kw)
111 except py4j.protocol.Py4JJavaError as e:
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.n".
Как уже упоминалось, проблема возникает при попытке подключения к базе данных SQL. Функция, используемая для подключения к базе данных, выглядит следующим образом:
подключение для чтения jdbc к базе данных SQL
class connect:
def __init__(self, conprops: conn.ConnectionProperties, logs):
self.connectionProperties = conprops
self.log = logs
def readFromDb(self,processId, query):
try:
jdbcDF = (self.connectionProperties.spark.read
.format("jdbc")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", f"jdbc:sqlserver://{self.connectionProperties.dbConnectionProperties.DBServer}.database.windows.net;database={self.connectionProperties.dbConnectionProperties.DBDatabase}")
.option("user", self.connectionProperties.dbConnectionProperties.DBUser)
.option("query", query)
.option("password", self.connectionProperties.dbConnectionProperties.DBPword)
.load()
)
return jdbcDF
except Exception as e:
self.log.writeToLogs(self,processId,logging.LogType.Error, logging.EventType.FailReadFromDb, logging.LogMessage.FailReadFromDb, errorType = logging.ErrorType.FailReadFromDb)
raise Exception(f"{logging.LogMessage.FailReadFromDb.value} ERROR: {e}")
except:
self.log.writeToLogs(self,processId,logging.LogType.Error, logging.EventType.FailReadFromDb, logging.LogMessage.FailReadFromDb, errorType = logging.ErrorType.FailReadFromDb)
raise Exception(f"{logging.LogMessage.FailReadFromDb.value}")
Любые мысли очень приветствуются..
Комментарии:
1. что это за тип
ProcessID
иEntityStageID
?2. Идентификатор процесса и EntityStageID являются целыми числами. Ты это имеешь в виду?
3. Я могу ошибаться, но проблема, похоже, в коде
jdbcDF = (connectionProperties.spark.read
4. Привет @AlexOtt, я добавил полный маршрут стека сюда: gist.github.com/cpatte7372/f9a820e82c5e57befa919430b1b9af45
5. У меня такое чувство, что это что-то вопиюще простое в том, как я пытаюсь подключиться к SQLDB, но я не могу понять, в чем дело