Как преобразовать файл Parquet в Protobuf и сохранить его в формате HDFS / AWS S3

#scala #apache-spark #protocol-buffers #parquet

#scala #apache-spark #протокол-буферы #parquet

Вопрос:

У меня есть файл в формате Parquet. Я хочу прочитать его и сохранить в формате HDFS или AWS S3 в формате Protobuf, используя spark с Scala. Я ни в чем не уверен. Искал много блогов, но ничего не мог понять, кто-нибудь может помочь?

Комментарии:

1. Как вы планируете их читать? Вы хотите записать их в виде файлов последовательности?

2. На самом деле я хочу попробовать несколько вариантов: 1-й — запись в виде файла последовательности, а 2-й — запись в виде Protobuf. В зависимости от того, что работает быстрее

Ответ №1:

Вы можете использовать ProtoParquetReader, который является ParquetReader с поддержкой ProtoReadSupport.

Что-то вроде:

        try (ParquetReader reader = ProtoParquetReader.builder(path).build()
        ) {
            while ((model = reader.read()) != null){
                System.out.println("check model "   "-- "   model);
...
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
 

Комментарии:

1. не могли бы вы немного объяснить это?

Ответ №2:

Для того чтобы считывать данные с паркета, вам необходимо использовать следующий код :

 public List<Record> read(Path path) {
     List<Record> records = new ArrayList<>();
     ParquetReader<Record> reader = AvroParquetReader<Record>builder(path).withConf(new Configuration()).build();
            for (Record value = reader.read(); value != null; value = reader.read()) {
                records.add(value);
            }
            return records;
}
 

Запись в файл из parquet будет выглядеть примерно так. Хотя это не файл protobuf, это может помочь вам начать работу. Имейте в виду, что у вас возникнут проблемы, если в конечном итоге вы будете использовать spark-stream с protobuf версии v2.6 и выше

 public void write(List<Record> records, String location) throws IOException {
        Path filePath = new Path(location);

        try (ParquetWriter<Record> writer = AvroParquetWriter.<GenericData.Record>builder(filePath)
            .withSchema(getSchema()) //
            .withConf(getConf()) //
            .withCompressionCodec(CompressionCodecName.SNAPPY) //
            .withWriteMode(Mode.CREATE) //
            .build()) {
            for (Record record : records) {
                writer.write(record);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }