Ошибка Spark JDBC при подключении к PostgreSQL

#python #postgresql #apache-spark #pyspark

#python #postgresql #apache-spark #pyspark

Вопрос:

Для начала я видел несколько сообщений об этом, но мне не повезло ни с одним из исправлений.

В настоящее время у меня есть следующий код:

 from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

if __name__ == '__main__':
    scSpark = SparkSession.builder.appName("postgres") 
        .config("spark.driver.extraClassPath", "C:/Spark/spark-3.0.1-bin-hadoop2.7/jars/postgresql-42.2.18.jar") 
        .getOrCreate()
    data_file = './data.csv'
    sdfData = scSpark.read.csv(data_file, header=True, sep=',').cache()
    sdfData.registerTempTable('sales')

    scSpark = SparkSession.builder.appName("postgres") 
        .config("spark.driver.extraClassPath", "C:/Spark/spark-3.0.1-bin-hadoop2.7/jars/postgresql-42.2.18.jar") 
        .getOrCreate()

    output = scSpark.sql('SELECT * from sales')
    output.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/spark',driver='com.mysql.cj.jdbc.Driver',dbtable='city_info',user='postgres',password='password').mode('append').save()
 

При запуске этого кода я получаю следующую ошибку:

 Traceback (most recent call last):
  File "main.py", line 20, in <module>
    output.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/spark',driver='com.mysql.cj.jdbc.Driver',dbtable='city_info',user='postgres',password='password').mode('append').save()
  File "C:UsersjacktAppDataLocalProgramsPythonPython38-32libsite-packagespysparksqlreadwriter.py", line 825, in save
    self._jwrite.save()
  File "C:UsersjacktAppDataLocalProgramsPythonPython38-32libsite-packagespy4jjava_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "C:UsersjacktAppDataLocalProgramsPythonPython38-32libsite-packagespysparksqlutils.py", line 128, in deco
    return f(*a, **kw)
  File "C:UsersjacktAppDataLocalProgramsPythonPython38-32libsite-packagespy4jprotocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o49.save.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:99)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:99)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:194)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:198)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
 

Я знаю, что ошибка есть java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver . Однако я вручную добавляю его в путь к классу, когда начинаю сеанс, и я убедился, что файл имеет полные разрешения, предоставленные группе пользователей.

Ответ №1:

Измените driver параметр при сохранении фрейма данных в базу данных. Вам необходимо использовать org.postgresql.Driver для Postgresql:

 output.write.format('jdbc').options(
   url='jdbc:postgresql://localhost:5432/spark',
   driver='org.postgresql.Driver',
   dbtable='city_info',
   user='postgres',
   password='password'
).mode('append').save()
 

Ответ №2:

Неправильный драйвер JDBC

com.mysql.cj.jdbc.Driver для MySQL, а не для Postgres. Это два разных конкурирующих продукта для серверов баз данных.

Для доступа к Postgres у вас есть выбор поставщиков драйверов JDBC:

  • Драйвер PostgreSQL JDBC из postgresql.org
  • PGJDBC-NG из impossibl для JDBC 4.2 и более поздних версий.
  • Коммерческие поставщики, такие как программное обеспечение OpenLink