#scala #apache-spark #for-loop
#scala #apache-spark #цикл for
Вопрос:
У меня есть 2 фрейма данных с именами df1
и df2
, где они оба имеют одинаковые имена столбцов. Я хочу запустить цикл for по уникальным датам, начиная с df1
и применить тот же фильтр даты к df2
. Я создал список уникальных дат, а затем попытался выполнить итерацию по нему. Однако то, что у меня есть, — это выдача ошибок.
Вот что у меня есть:
val unique_weeks = df1.select(df1("date")).distinct
for( week <- unique_weeks) {
val df1_filtered = df1.filter($"date" === week)
val df2_filtered = df2.filter($"date" === week)
/// will run a join here and more code
}
Я думаю, <-
что эта часть может быть неправильной — но не уверен, как я могу фильтровать фреймы данных, используя другой метод.
Вот ошибка:
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 3.0 failed 1 times, most recent failure: Lost task 35.0 in stage 3.0 (TID 399, localhost, executor driver): java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error] at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error] at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error] at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error] at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error] at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error] at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error] at java.lang.Thread.run(Thread.java:748)
[error]
[error] Driver stacktrace:
[error] org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 3.0 failed 1 times, most recent failure: Lost task 35.0 in stage 3.0 (TID 399, localhost, executor driver): java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error] at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error] at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error] at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error] at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error] at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error] at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error] at java.lang.Thread.run(Thread.java:748)
[error]
[error] Driver stacktrace:
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[error] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[error] at scala.Option.foreach(Option.scala:257)
[error] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
[error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
[error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
[error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
[error] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[error] at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
[error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
[error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
[error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
[error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)
[error] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[error] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[error] at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
[error] at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)
[error] at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2286)
[error] at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2286)
[error] at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2286)
[error] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
[error] at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
[error] at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2285)
[error] at spark_pkg.SparkMain$.main(SparkMain.scala:878)
[error] at spark_pkg.SparkMain.main(SparkMain.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error] at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error] at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error] at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error] at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error] at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error] at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error] at java.lang.Thread.run(Thread.java:748)
[error] stack trace is suppressed; run 'last Compile / bgRun' for the full output
[error] Nonzero exit code: 1
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 137 s (02:17), completed Aug 20, 2020 1:16:02 PM
Комментарии:
1. Какие исключения генерируются?
2. я не запускал полную программу с момента ее создания, но моя IDE выдает мне красную строку под
<-
3. Немного вводит в заблуждение утверждение, что он выдает ошибки, которые можно разумно интерпретировать как возникающие во время выполнения.
4. В любом случае, что, по словам IDE, означает ошибка под красной строкой?
5. @LeviRamsey я обновил вопрос, чтобы включить ошибку во время выполнения
Ответ №1:
Фрейм данных не является итератором, и, следовательно, вы не можете запускать цикл for над ним. Вы можете запустить что-то вроде этого, но я не думаю, что это сделает то, чего вы надеетесь достичь, основываясь на вашем другом коде.
unique_weeks.foreachPartition{ weeks : Iterator[YourData] =>
for( week <- weeks) {
}
}
Ваш вопрос предполагает, что ваша ментальная модель того, что такое фрейм данных и как работает Spark, не совсем завершена. Думайте о фрейме данных скорее как о List[List[YourData]]
, за исключением того, что каждый внутренний фрейм данных List[YourData]
расположен на независимой части компьютера и не обязательно может знать или взаимодействовать с любым из других List
, пока вы не соберете их обратно в драйвер.
Комментарии:
1. Scaladoc в Spark 3.0 указывает, что фреймы данных имеют
foreach
метод.