Вставка данных фрейма данных в RDD

#scala #apache-spark

#scala #apache-spark

Вопрос:

В настоящее время у меня есть фрейм данных, содержащий результаты SQL-запроса. Фрейм данных содержит столбцы PatientID, date и code.

 val res1 = sqlContext.sql("select encounter.Member_ID AS patientID, encounter.Encounter_DateTime AS date, diag.code from encounter join diag on encounter.Encounter_ID = diag.Encounter_ID")
  

Я пытаюсь взять этот фрейм данных и поместить его в RDD формата RDD [Diagnostic], где Diagnostic — это класс обращения вида:

 case class Diagnostic(patientID:String, date: Date, code: String)
  

Возможно ли это? Моя текущая попытка отбрасывает scala.MatchError исходящее из приведенной ниже строки.

 val diagnostic: RDD[Diagnostic] = res1.map {
  case Row(patientID:String, date:java.util.Date, code:String) => Diagnostic(patientID=patientID, date=date, code=code)
}
  

Схема:

 root
 |-- patientID: string (nullable = true)
 |-- date: string (nullable = true)
 |-- code: string (nullable = true)
  

Сообщение об ошибке от res1.as[Diagnostic] :

 Main.scala:170: overloaded method value as with alternatives:
[error]   (alias: Symbol)org.apache.spark.sql.DataFrame <and>
[error]   (alias: String)org.apache.spark.sql.DataFrame
[error]  does not take type parameters
[error]     val testlol: RDD[Diagnostic] = res1.as[Diagnostic]
[error]                                         ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Oct 9, 2016 3:16:38 PM
  

Полное сообщение об ошибке:

 [Stage 4:=======================================>                   (2   

1) / 3]16/10/09 14:23:32 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 8)
scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/10/09 14:23:32 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 8, localhost): scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

16/10/09 14:23:32 ERROR TaskSetManager: Task 0 in stage 6.0 failed 1 times; aborting job
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 8, localhost): scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
[error]         at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
[error]         at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
[error]         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
[error]         at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
[error]         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
[error]         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
[error]         at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
[error]         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
[error]         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
[error]         at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
[error]         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
[error]         at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
[error]         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
[error]         at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
[error]         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
[error]         at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
[error]         at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
[error]         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
[error]         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
[error]         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
[error]         at org.apache.spark.scheduler.Task.run(Task.scala:64)
[error]         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
[error]         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[error]         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[error]         at java.lang.Thread.run(Thread.java:745)
[error]
[error] Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 8, localhost): scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[trace] Stack trace suppressed: run last compile:run for the full output.
16/10/09 14:23:32 ERROR ContextCleaner: Error in cleaning thread
java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:146)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:144)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:144)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
        at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:143)
        at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65)
16/10/09 14:23:32 ERROR Utils: Uncaught exception in thread SparkListenerBus
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:996)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
        at java.util.concurrent.Semaphore.acquire(Semaphore.java:317)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:62)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
java.lang.RuntimeException: Nonzero exit code: 1
        at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 13 s, completed Oct 9, 2016 2:23:32 PM
  

Комментарии:

1. можете ли вы показать все сообщение об ошибке? Он должен показывать фактические типы, которые, вероятно, не соответствуют тем, которые вы ожидаете.

2. Добавлено над сообщением об ошибке.

Ответ №1:

java.util.Date это не тот тип данных, который может быть сохранен в DataFrame . Судя по всему, это date Timestamp String . Если я прав, класс case должен быть определен как:

 case class Diagnostic(patientID: String, date: java.sql.Timestamp, code: String)
  

вы должны заменить шаблон:

 case Row(patientID: String, date: java.util.Date, code: String)
  

с:

 case Row(patientID: String, date: java.sql.Timestamp, code: String)
  

и приведение date к timestamp :

 res1.select($"patientID", $"date".cast("timestamp"), $"code")
  

Наконец, вы должны использовать rdd метод перед отображением для прямой совместимости:

 res1.select($"patientID", $"date".cast("timestamp"), $"code").rdd.map {
  ...
}
  

В общем, я бы рекомендовал использовать as метод:

 res1.as[Diagnostic].rdd