Чтение файла последовательности, написанного Spark

#scala #hadoop #cascading #sequencefile #scalding

#scala #hadoop #каскадирование #файл последовательности #ошпаривание

Вопрос:

У меня есть куча файлов последовательности, которые я хочу прочитать с помощью Scalding, и у меня возникли некоторые проблемы. Это мой код:

 class ReadSequenceFileApp(args:Args) extends ConfiguredJob(args) {
  SequenceFile(args("in"), ('_, 'wbytes))
    .read
    .mapTo[BytesWritable, Array[Byte]]('wbytes -> 'bytes)(_.copyBytes())
    .write(TextLine(args("out")))
}
  

Я получаю разные трассировки стека, запускающие его локально и в HDFS???

Моя локальная трассировка стека:

 14/07/02 12:46:27 INFO mapred.FileInputFormat: Total input paths to process : 1
14/07/02 12:46:27 INFO hadoop.TupleSerialization: using default comparator: com.twitter.scalding.IntegralComparator
14/07/02 12:46:27 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.tuple.TupleException: unable to read from input identifier: 'unknown'
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:127)
        at cascading.flow.stream.SourceStage.map(SourceStage.java:76)
        at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
        at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast to cascading.tuple.Tuple
        at cascading.scheme.hadoop.SequenceFile.source(SequenceFile.java:87)
        at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:140)
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:120)
        ... 7 more
14/07/02 12:46:27 ERROR stream.SourceStage: caught throwable
cascading.tuple.TupleException: unable to read from input identifier: 'unknown'
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:127)
        at cascading.flow.stream.SourceStage.map(SourceStage.java:76)
        at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
        at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast to cascading.tuple.Tuple
        at cascading.scheme.hadoop.SequenceFile.source(SequenceFile.java:87)
        at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:140)
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:120)
        ... 7 more
14/07/02 12:46:27 INFO flow.Flow: [com.adb.infra.scal...] stopping all jobs
14/07/02 12:46:27 INFO flow.FlowStep: [com.adb.infra.scal...] stopping: local
14/07/02 12:46:27 INFO flow.Flow: [com.adb.infra.scal...] stopped all jobs
Exception in thread "main" cascading.flow.FlowException: local step failed
        at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:208)
        at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:145)
        at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:120)
        at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Caused by: cascading.tuple.TupleException: unable to read from input identifier: 'unknown'
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:127)
        at cascading.flow.stream.SourceStage.map(SourceStage.java:76)
        at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
        at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
        ... 4 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast to cascading.tuple.Tuple
        at cascading.scheme.hadoop.SequenceFile.source(SequenceFile.java:87)
        at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:140)
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:120)
  

И когда я запускаю его в hdfs:

 Exception in thread "main" java.lang.Throwable: GUESS: Data is missing from the path you provied.
If you know what exactly caused this error, please consider contributing to GitHub via following link.
https://github.com/twitter/scalding/wiki/Common-Exceptions-and-possible-reasons#comtwitterscaldinginvalidsourceexception
        at com.twitter.scalding.Tool$.main(Tool.scala:147)
        at com.twitter.scalding.Tool.main(Tool.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: com.twitter.scalding.InvalidSourceException: [SequenceFile(/home/p/dataFromHdfs/logs,'_', 'bytes')] Data is missing from one or more paths in: List(/home/p/dataFromHdfs/logs)
        at com.twitter.scalding.FileSource.validateTaps(FileSource.scala:102)
        at com.twitter.scalding.Job$$anonfun$validateSources$1.apply(Job.scala:158)
        at com.twitter.scalding.Job$$anonfun$validateSources$1.apply(Job.scala:153)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at com.twitter.scalding.Job.validateSources(Job.scala:153)
        at com.twitter.scalding.Job.buildFlow(Job.scala:91)
        at com.twitter.scalding.Job.run(Job.scala:126)
        at com.twitter.scalding.Tool.start$1(Tool.scala:109)
        at com.twitter.scalding.Tool.run(Tool.scala:125)
        at com.twitter.scalding.Tool.run(Tool.scala:72)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.twitter.scalding.Tool$.main(Tool.scala:133)
  

Я слышал, что может возникнуть проблема с чтением файлов последовательности, которые создаются вне каскадирования с использованием scalding. Я не совсем понимаю, почему, и я не уверен, как это решить.

Я буду признателен за объяснение и решение этого

Комментарии:

1. Вы объявляете байты как ‘bytes, но затем пытаетесь использовать его как ‘wbytes. Это вызовет проблемы, хотя не думаю, что это все исправит 🙁

2. Я внес некоторые изменения, когда размещал здесь код, и это была опечатка…