Spark SQL 2.0: исключение NullPointerException с допустимым запросом PostgreSQL

#postgresql #scala #apache-spark #apache-spark-sql

#postgresql #scala #apache-spark #apache-spark-sql

Вопрос:

У меня есть допустимый запрос PostgreSQL: когда я копирую / вставляю его в PSQL, я получаю желаемый результат.
Но когда я запускаю Spark SQL, это приводит к NullPointerException .

Вот фрагмент кода, вызывающий ошибку:

 extractDataFrame().show()

private def extractDataFrame(): DataFrame = {
  val query =
    """(
      SELECT events.event_facebook_id, events.name, events.tariffrange,
        eventscounts.attending_count, eventscounts.declined_count, eventscounts.interested_count,
        eventscounts.noreply_count,
        artists.facebookid as artist_facebook_id, artists.likes as artistlikes,
        organizers.organizerid, organizers.likes as organizerlikes,
        places.placeid, places.capacity, places.likes as placelikes
      FROM events
        LEFT JOIN eventscounts on eventscounts.event_facebook_id = events.event_facebook_id
        LEFT JOIN eventsartists on eventsartists.event_id = events.event_facebook_id
          LEFT JOIN artists on eventsartists.artistid = artists.facebookid
        LEFT JOIN eventsorganizers on eventsorganizers.event_id = events.event_facebook_id
          LEFT JOIN organizers on eventsorganizers.organizerurl = organizers.facebookurl
        LEFT JOIN eventsplaces on eventsplaces.event_id = events.event_facebook_id
          LEFT JOIN places on eventsplaces.placefacebookurl = places.facebookurl
      ) df"""

  spark.sqlContext.read.jdbc(databaseURL, query, connectionProperties)
}
  

SparkSession определяется следующим образом:

 val databaseURL = "jdbc:postgresql://dbHost:5432/ticketapp" 
val spark = SparkSession
  .builder
  .master("local[*]")
  .appName("tariffPrediction")
  .getOrCreate()

val connectionProperties = new Properties
connectionProperties.put("user", "simon")
connectionProperties.put("password", "root")
  

И вот полная трассировка стека:

 [SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 27, localhost): java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:]
  

Самое удивительное, что если я удалю одно (какое угодно) из LEFT JOIN предложений в SQL-запросе, я не получу никаких ошибок…

Ответ №1:

Вместо этого у меня очень похожие проблемы с источником данных Teradata, и это сводилось к тому, что обнуляемость столбца в DataFrame не соответствовала базовым данным (столбец имел значение nullable= false, но некоторые строки имели нулевые значения в этом конкретном поле). Причиной в моем случае был драйвер Teradata JDBC, не возвращающий правильные метаданные столбца. Мне еще предстоит найти обходной путь для этого.

Чтобы увидеть генерируемый код (в котором генерируется NPE):

  • импортируйте org.apache.spark.sql.execution.отладка._
  • вызов .debugCodegen() для набора данных / фрейма данных

Надеюсь, это поможет.

Ответ №2:

Эта проблема связана с драйвером Teradata JDBC. Эта проблема обсуждается на https://community.teradata.com/t5/Connectivity/Teradata-JDBC-Driver-returns-the-wrong-schema-column-nullability/m-p/76667/highlight/true#M3798.

Основная причина обсуждается на первой странице. И решение находится на третьей странице.

Люди из Teradata сказали, что исправили проблему в драйвере 16.10. * с параметром MAYBENULL, но я все еще вижу неопределенное поведение.

Вот аналогичное обсуждение https://issues.apache.org/jira/browse/SPARK-17195

Ответ №3:

Если кто-то еще все еще ищет решение, вы можете использовать NULLIF для столбца, который вызывает проблемы, вызванные JOIN получением значения null для столбца, который изначально not null был в указанной схеме.

Связанный JIRA: https://issues.apache.org/jira/browse/SPARK-18859