#postgresql #scala #apache-spark #apache-spark-sql
#postgresql #scala #apache-spark #apache-spark-sql
Вопрос:
У меня есть допустимый запрос PostgreSQL: когда я копирую / вставляю его в PSQL, я получаю желаемый результат.
Но когда я запускаю Spark SQL, это приводит к NullPointerException
.
Вот фрагмент кода, вызывающий ошибку:
extractDataFrame().show()
private def extractDataFrame(): DataFrame = {
val query =
"""(
SELECT events.event_facebook_id, events.name, events.tariffrange,
eventscounts.attending_count, eventscounts.declined_count, eventscounts.interested_count,
eventscounts.noreply_count,
artists.facebookid as artist_facebook_id, artists.likes as artistlikes,
organizers.organizerid, organizers.likes as organizerlikes,
places.placeid, places.capacity, places.likes as placelikes
FROM events
LEFT JOIN eventscounts on eventscounts.event_facebook_id = events.event_facebook_id
LEFT JOIN eventsartists on eventsartists.event_id = events.event_facebook_id
LEFT JOIN artists on eventsartists.artistid = artists.facebookid
LEFT JOIN eventsorganizers on eventsorganizers.event_id = events.event_facebook_id
LEFT JOIN organizers on eventsorganizers.organizerurl = organizers.facebookurl
LEFT JOIN eventsplaces on eventsplaces.event_id = events.event_facebook_id
LEFT JOIN places on eventsplaces.placefacebookurl = places.facebookurl
) df"""
spark.sqlContext.read.jdbc(databaseURL, query, connectionProperties)
}
SparkSession определяется следующим образом:
val databaseURL = "jdbc:postgresql://dbHost:5432/ticketapp"
val spark = SparkSession
.builder
.master("local[*]")
.appName("tariffPrediction")
.getOrCreate()
val connectionProperties = new Properties
connectionProperties.put("user", "simon")
connectionProperties.put("password", "root")
И вот полная трассировка стека:
[SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 27, localhost): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:]
Самое удивительное, что если я удалю одно (какое угодно) из LEFT JOIN
предложений в SQL-запросе, я не получу никаких ошибок…
Ответ №1:
Вместо этого у меня очень похожие проблемы с источником данных Teradata, и это сводилось к тому, что обнуляемость столбца в DataFrame не соответствовала базовым данным (столбец имел значение nullable= false, но некоторые строки имели нулевые значения в этом конкретном поле). Причиной в моем случае был драйвер Teradata JDBC, не возвращающий правильные метаданные столбца. Мне еще предстоит найти обходной путь для этого.
Чтобы увидеть генерируемый код (в котором генерируется NPE):
- импортируйте org.apache.spark.sql.execution.отладка._
- вызов .debugCodegen() для набора данных / фрейма данных
Надеюсь, это поможет.
Ответ №2:
Эта проблема связана с драйвером Teradata JDBC. Эта проблема обсуждается на https://community.teradata.com/t5/Connectivity/Teradata-JDBC-Driver-returns-the-wrong-schema-column-nullability/m-p/76667/highlight/true#M3798.
Основная причина обсуждается на первой странице. И решение находится на третьей странице.
Люди из Teradata сказали, что исправили проблему в драйвере 16.10. * с параметром MAYBENULL, но я все еще вижу неопределенное поведение.
Вот аналогичное обсуждение https://issues.apache.org/jira/browse/SPARK-17195
Ответ №3:
Если кто-то еще все еще ищет решение, вы можете использовать NULLIF
для столбца, который вызывает проблемы, вызванные JOIN
получением значения null для столбца, который изначально not null
был в указанной схеме.
Связанный JIRA: https://issues.apache.org/jira/browse/SPARK-18859