Scala: как получить содержимое экземпляра PortableDataStream из RDD

#scala #apache-spark #rdd

#scala #apache-spark #rdd

Вопрос:

Поскольку я хочу извлечь данные из двоичных файлов, я читаю файлы, используя val dataRDD = sc.binaryRecord("Path") я получаю результат в виде org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)]

Я хочу извлечь содержимое моих файлов, которое находится в виде PortableDataStream

Для этого я пытался: val data = dataRDD.map(x => x._2.open()).collect() но я получаю следующую ошибку:

исключение java.io.NotSerializableException:org.apache.hadoop.hdfs.client.HdfsDataInputStream

Если у вас есть идея, как я могу решить свою проблему, пожалуйста, ПОМОГИТЕ!

Заранее большое спасибо.

Ответ №1:

На самом деле, PortableDataStream является сериализуемым. Вот для чего это предназначено. Тем не менее, open() возвращает простой DataInputStream ( HdfsDataInputStream в вашем случае, потому что ваш файл находится в HDFS), который не сериализуем, отсюда и ошибка, которую вы получаете.

Фактически, когда вы открываете PortableDataStream , вам просто нужно сразу прочитать данные. В scala вы можете использовать scala.io.Source.fromInputStream :

 val data : RDD[Array[String]] = sc
    .binaryFiles("path/.../")
    .map{ case (fileName, pds) => {
        scala.io.Source.fromInputStream(pds.open())
            .getLines().toArray
    }}
  

Этот код предполагает, что данные являются текстовыми. Если это не так, вы можете адаптировать его для чтения любого вида двоичных данных. Вот пример создания последовательности байтов, которую вы могли бы обрабатывать так, как хотите.

 val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
    .map{ case (file, pds) => {
        val dis = pds.open()
        val bytes = Array.ofDim[Byte](1024)
        val all = scala.collection.mutable.ArrayBuffer[Byte]()
        while( dis.read(bytes) != -1) {
            all   = bytes
        }
        all.toSeq
    }}
  

Смотрите javadoc DataInputStream для получения дополнительных возможностей. Например, он обладает readLong , readDouble (и так далее) методами.

Комментарии:

1. Спасибо за ваш ответ. Я попробовал ваше решение, там отсутствует ‘}’, а также я должен отредактировать его .toSeq() на .toSeq , иначе это выдает меня error: not enough arguments for method apply: (idx: Int)String in trait SeqLike . Хотя решение дает ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job

2. Извините, я не протестировал весь код. Я исправил свой ответ.

3. На самом деле это не работает с toSeq , вероятно, потому, что он просто инкапсулирует итератор на основе InputStream. toArray напротив, фактически извлекает данные, чтобы поместить их в массив. Я изменил его в ответе. Спасибо за ваш отзыв 😉

4. Спасибо @Oli за ваш ответ. Проблема все еще существует java.nio.charset.MalformedInputException: Input length = 1

5. ХОРОШО, это другое. Какие данные вы читаете? Это был просто пример для строк. Если вы читаете двоичные данные, это не сработает, и вам придется адаптировать код…

Ответ №2:

 val bf    = sc.binaryFiles("...")
val bytes = bf.map{ case(file, pds) => {
    val dis = pds.open()
    val len = dis.available();
    val buf = Array.ofDim[Byte](len)
    pds.open().readFully(buf)
    buf
}}
bytes: org.apache.spark.rdd.RDD[Array[Byte]] = MapPartitionsRDD[21] at map at <console>:26

scala> bytes.take(1)(0).size
res15: Int = 5879609  // this happened to be the size of my first binary file