SPARK: как динамически генерировать путь к файлу s3 с использованием date diff

#scala #apache-spark #amazon-s3 #filesystems #jodatime

#scala #apache-spark #amazon-s3 #файловые системы #jodatime

Вопрос:

Я пытаюсь получить список файлов между начальной и конечной датой и прочитать файлы из этих папок :

Например, моя файловая структура выглядит следующим образом: имя_заголовка / год / месяц / день / файлы

    s3://testBucket/2016/10/16/part00000
 

Все эти файлы являются json. Проблема в том, что мне нужно загрузить все пути между начальной датой и конечной датой :

С днем начала (16.10.2016) и датой окончания (16.09.2016) я хотел бы прочитать с 16.09.2016 (включительно) …. до …. 16.10.2016 (включительно)

     import org.joda.time.Days
    import org.joda.time.DurationFieldType
    import org.joda.time.LocalDate
    import org.joda.time.format.DateTimeFormat
    import org.joda.time.format.DateTimeFormatter

    val s3Bucket: String = "S3://myTestBucket/"

    val startTimestamp: String = "2016-09-16T00:00:00Z"
    val endTimestamp: String = "2016-10-16T00:00:00Z"

    val dtf: DateTimeFormatter = DateTimeFormat.forPattern( "yyyy-MMM-dd" )
    val startDate: LocalDate = dtf.parseLocalDate( startTimestamp )

    val endDate: LocalDate = dtf.parseLocalDate( endTimestamp )


    val days: Int = Days.daysBetween( startDate, endDate ).getDays

    System.out.print( days )

    val dates = new ListBuffer[String]()
    var i: Int = 0
    while (i < days) {
      {
        val d: LocalDate = startDate.withFieldAdded( DurationFieldType.days, i )
        val tempDate: String = s3Bucket   d.getYear   "/"   d.getMonthOfYear   "/"   d.getDayOfMonth   "/"   "*"
        dates  = tempDate
      }
      {
        i  = 1; 
      }
    }
    val dateList = dates.toList
    val files = dateList.mkString(", ")
    sqlContext.read.json(files)
 

Правильно ли это сделать ? Есть ли какой-либо другой эффективный способ сделать это?

основываясь на приведенном решении, я получаю эту ошибку :

 org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581)
    at org.apache.spark.SparkContext$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1731)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1730)
    at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
    at org.apache.spark.rdd.RDD$anonfun$collect$1.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
    at org.apache.spark.sql.sources.HadoopFsRelation$.listLeafFilesInParallel(interfaces.scala:904)
    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:445)
    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)
    at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$fileStatusCache$lzycompute(interfaces.scala:489)
    at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$fileStatusCache(interfaces.scala:487)
    at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)
    at org.apache.spark.sql.execution.datasources.json.JSONRelation$anonfun$4.apply(JSONRelation.scala:110)
    at org.apache.spark.sql.execution.datasources.json.JSONRelation$anonfun$4.apply(JSONRelation.scala:109)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema$lzycompute(JSONRelation.scala:109)
    at org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema(JSONRelation.scala:108)
    at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
    at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
    at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:37)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:136)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:263)
    at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:58)
    at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:63)
    at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:65)
    at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:67)
    at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:69)
    at $iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:71)
    at $iwC$iwC$iwC$iwC$iwC.<init>(<console>:73)
    at $iwC$iwC$iwC$iwC.<init>(<console>:75)
    at $iwC$iwC$iwC.<init>(<console>:77)
    at $iwC$iwC.<init>(<console>:79)
    at $iwC.<init>(<console>:81)
    at <init>(<console>:83)
    at .<init>(<console>:87)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:664)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:629)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:622)
    at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:170)
    at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
 

Ответ №1:

Я не думаю, что это может быть намного эффективнее, но это определенно не идиоматично (с использованием while и var ) и может быть сделано короче и лаконичнее:

 val s3Bucket: String = "S3://myTestBucket/"
val startDate: LocalDate = new LocalDate(2016, 9, 16)
val endDate: LocalDate = new LocalDate(2016, 10, 16)

val days: Int = Days.daysBetween(startDate, endDate).getDays

val pathDTF = DateTimeFormat.forPattern("yyyy/MM/dd")

val files: Seq[String] = (0 to days)
  .map(startDate.plusDays)
  .map(d => s"$s3Bucket${pathDTF.print(d)}/*")

val result = sqlContext.read.json(files: _*) 
 

РЕДАКТИРОВАТЬ: спасибо @Newbie за замечание — действительно, нельзя передать список файлов read.json(...) , поэтому последняя строка должна быть:

 val result = sqlContext.read.json(sc.textFile(files.mkString(",")))
 

Комментарии:

1. Нужно ли мне добавлять сюда явную проверку файла, или spark позаботится об этом в случае, если в ключе s3 нет ни одного файла, который я хотел бы перейти к следующей папке.

2. Мой контекст spark закрывается. Это из-за проверки файла?

3. Новичок, я не уверен, является ли это исключение основной причиной или симптомом. Есть ли другая трассировка стека?

4. @rado 16/10/16 23:27:57 INFO YarnAllocator: Завершенный контейнер container_1476653670979_0003_02_000077 на хосте: ip-172-31-15-9 .ec2.internal (состояние: ЗАВЕРШЕНО, статус выхода: 1) 16/10/16 23:27:57 ПРЕДУПРЕДИТЬ YarnAllocator: Контейнер, помеченный как сбойный: container_1476653670979_0003_02_000077 на хосте: ip-172-31-15-9.ec2.internal. Статус выхода: 1. Диагностика: Исключение из запуска контейнера. Идентификатор контейнера: container_1476653670979_0003_02_000077 Код выхода: 1 Это то, что я вижу в журналах контейнеров

5. @Tzach: кажется, что разделение каталогов запятыми работает только с sc.textFile(), а не с SQLContext.read.json()