Ошибка при выполнении внутреннего соединения в Spark 2.0.1 DataFrame

#scala #apache-spark #spark-dataframe

#scala #apache-spark #apache-spark-sql

Вопрос:

Кто-нибудь еще сталкивался с этой проблемой и имеет идеи о том, как ее обойти?

Я пытался обновить свой код, чтобы использовать Spark 2.0.1 и Scala 2.11. В Spark 1.6.0 с Scala 2.10 все работало нормально. У меня есть прямое внутреннее соединение dataframe с dataframe, которое возвращает ошибку. Данные поступают из AWS RDS Aurora. Обратите внимание, что приведенный ниже фрейм данных foo на самом деле состоит из 92 столбцов, а не из двух, которые я показал. Проблема по-прежнему сохраняется, даже если имеется только два столбца.

Соответствующая информация:

DataFrame 1 со схемой

 foo.show()

 -------------------- ------ 
|      Transaction ID|   BIN|
 -------------------- ------ 
|               bbBW0|134769|
|               CyX50|173622|
 -------------------- ------ 

println(foo.printSchema())

root
|-- Transaction ID: string (nullable = true)
|-- BIN: string (nullable = true)
  

DataFrame 2 со схемой

 bar.show()

 -------------------- ----------------- ------------------- 
|              TranId|       Amount_USD|     Currency_Alpha|
 -------------------- ----------------- ------------------- 
|               bbBW0|            10.99|                USD|
|               CyX50|           438.53|                USD|
 -------------------- ----------------- ------------------- 

println(bar.printSchema())

root
|-- TranId: string (nullable = true)
|-- Amount_USD: string (nullable = true)
|-- Currency_Alpha: string (nullable = true)
  

Объединение фреймов данных с explain

 val asdf = foo.join(bar, foo("Transaction ID") === bar("TranId"))
println(foo.join(bar, foo("Transaction ID") === bar("TranId")).explain())

== Physical Plan ==
*BroadcastHashJoin [Transaction ID#0], [TranId#202], Inner, BuildRight
:- *Scan JDBCRelation((SELECT

        ...
        I REMOVED A BUNCH OF LINES FROM THIS PRINT OUT
        ...

      ) as x) [Transaction ID#0,BIN#8] PushedFilters: [IsNotNull(Transaction ID)], ReadSchema: struct<Transaction ID:string,BIN:string>
 - BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
    - *Filter isnotnull(TranId#202)
       - InMemoryTableScan [TranId#202, Amount_USD#203, Currency_Alpha#204], [isnotnull(TranId#202)]
         :   - InMemoryRelation [TranId#202, Amount_USD#203, Currency_Alpha#204], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         :     :   - Scan ExistingRDD[TranId#202,Amount_USD#203,Currency_Alpha#204]
  

Теперь ошибка, которую я получаю, заключается в следующем:

 16/10/18 11:36:50 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6)
java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'ID IS NOT NULL)' at line 54
  

Полный стек можно увидеть здесь (http://pastebin.com/C9bg2HFt )

Нигде в моем коде или в моем запросе jdbc, извлекающем данные из базы данных, у меня нет ID IS NOT NULL) . Я потратил кучу времени на поиск в Google и нашел коммит для Spark, который добавляет фильтры null в план запроса для соединений. Вот фиксация (https://git1-us-west.apache.org/repos/asf?p=spark.git ;a=фиксация;h=ef770031)

Ответ №1:

Любопытно, пробовали ли вы следующее;

 val dfRenamed = bar.withColumnRenamed("TranId", " Transaction ID")
val newDF = foo.join(dfRenamed, Seq("Transaction ID"), "inner")