Запуск RDD из файла последовательности и десериализация

#apache-spark #deserialization #apache-spark-sql #spark-dataframe

#apache-spark #десериализация #apache-spark-sql

Вопрос:

Я использую Spark 1.5.0 и хочу прочитать файл sqeuence — ключ — это имя файла (ext), а значение — это список объектов Java типа Myclass

Вот мой код для выполнения этого без Spark.

 val ois = new ObjectInputStream(new FileInputStream("/path/to/seqfile"))
val data = ois.readObject.asInstanceOf[java.util.List[MyClass]]
val scalalist = data.asScala
  

Я хочу использовать Spark для выполнения того же самого, однако я не уверен, когда сериализованные данные доступны в string, как мне создать RDD, где 2-й элемент кортежа приводится к списку объектов MyClass.

 val seq_rdd = sc.sequenceFile("/path/to/seqfile", classOf[Text], classOf[BytesWritable])
val seq_formatted_rdd = seq_rdd.map { case (text, bytes) => (text.toString, bytes.copyBytes) }
val my_rdd = seq_formatted_rdd.map { case (text, ser_bytes) => (text, new ByteArrayInputStream(ser_bytes)) }
  

Я получаю следующее исключение, потому что ByteArrayInputStream не реализует Serializable:

  • объект не сериализуемый (класс: java.io.ByteArrayInputStream, значение: java.io.ByteArrayInputStream@73d5a077 )

После этого я хочу сделать следующее:

 val my_rdd1 = my_rdd.map { case (text, bytestream) => (text, new ObjectInputStream(bytestream).readObject.asInstanceOf[java.util.List[MyClass]])}
  

Комментарии:

1. Вам нужно прочитать только один файл? Если это так, вы можете просто прочитать его, как вы это сделали, а затем использовать SparkContext#parallelize для создания RDD. Во всех этих отображениях нет смысла, если только вам не нужно прочитать несколько файлов.

2. Последовательность содержит много (несколько тысяч) сериализованных файлов в виде значений. Каждое из этих значений имеет тип java.util. Список [MyClass].