#python #postgresql #apache-spark #pyspark
#python #postgresql #apache-spark #pyspark
Вопрос:
Для начала я видел несколько сообщений об этом, но мне не повезло ни с одним из исправлений.
В настоящее время у меня есть следующий код:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
if __name__ == '__main__':
scSpark = SparkSession.builder.appName("postgres")
.config("spark.driver.extraClassPath", "C:/Spark/spark-3.0.1-bin-hadoop2.7/jars/postgresql-42.2.18.jar")
.getOrCreate()
data_file = './data.csv'
sdfData = scSpark.read.csv(data_file, header=True, sep=',').cache()
sdfData.registerTempTable('sales')
scSpark = SparkSession.builder.appName("postgres")
.config("spark.driver.extraClassPath", "C:/Spark/spark-3.0.1-bin-hadoop2.7/jars/postgresql-42.2.18.jar")
.getOrCreate()
output = scSpark.sql('SELECT * from sales')
output.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/spark',driver='com.mysql.cj.jdbc.Driver',dbtable='city_info',user='postgres',password='password').mode('append').save()
При запуске этого кода я получаю следующую ошибку:
Traceback (most recent call last):
File "main.py", line 20, in <module>
output.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/spark',driver='com.mysql.cj.jdbc.Driver',dbtable='city_info',user='postgres',password='password').mode('append').save()
File "C:UsersjacktAppDataLocalProgramsPythonPython38-32libsite-packagespysparksqlreadwriter.py", line 825, in save
self._jwrite.save()
File "C:UsersjacktAppDataLocalProgramsPythonPython38-32libsite-packagespy4jjava_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "C:UsersjacktAppDataLocalProgramsPythonPython38-32libsite-packagespysparksqlutils.py", line 128, in deco
return f(*a, **kw)
File "C:UsersjacktAppDataLocalProgramsPythonPython38-32libsite-packagespy4jprotocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o49.save.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:99)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:99)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:194)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:198)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Я знаю, что ошибка есть java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
. Однако я вручную добавляю его в путь к классу, когда начинаю сеанс, и я убедился, что файл имеет полные разрешения, предоставленные группе пользователей.
Ответ №1:
Измените driver
параметр при сохранении фрейма данных в базу данных. Вам необходимо использовать org.postgresql.Driver
для Postgresql:
output.write.format('jdbc').options(
url='jdbc:postgresql://localhost:5432/spark',
driver='org.postgresql.Driver',
dbtable='city_info',
user='postgres',
password='password'
).mode('append').save()
Ответ №2:
Неправильный драйвер JDBC
com.mysql.cj.jdbc.Driver
для MySQL, а не для Postgres. Это два разных конкурирующих продукта для серверов баз данных.
Для доступа к Postgres у вас есть выбор поставщиков драйверов JDBC:
- Драйвер PostgreSQL JDBC из postgresql.org
- PGJDBC-NG из impossibl для JDBC 4.2 и более поздних версий.
- Коммерческие поставщики, такие как программное обеспечение OpenLink