#scala #apache-spark #protocol-buffers #parquet
#scala #apache-spark #протокол-буферы #parquet
Вопрос:
У меня есть файл в формате Parquet. Я хочу прочитать его и сохранить в формате HDFS или AWS S3 в формате Protobuf, используя spark с Scala. Я ни в чем не уверен. Искал много блогов, но ничего не мог понять, кто-нибудь может помочь?
Комментарии:
1. Как вы планируете их читать? Вы хотите записать их в виде файлов последовательности?
2. На самом деле я хочу попробовать несколько вариантов: 1-й — запись в виде файла последовательности, а 2-й — запись в виде Protobuf. В зависимости от того, что работает быстрее
Ответ №1:
Вы можете использовать ProtoParquetReader, который является ParquetReader с поддержкой ProtoReadSupport.
Что-то вроде:
try (ParquetReader reader = ProtoParquetReader.builder(path).build()
) {
while ((model = reader.read()) != null){
System.out.println("check model " "-- " model);
...
}
} catch (IOException e) {
e.printStackTrace();
}
Комментарии:
1. не могли бы вы немного объяснить это?
Ответ №2:
Для того чтобы считывать данные с паркета, вам необходимо использовать следующий код :
public List<Record> read(Path path) {
List<Record> records = new ArrayList<>();
ParquetReader<Record> reader = AvroParquetReader<Record>builder(path).withConf(new Configuration()).build();
for (Record value = reader.read(); value != null; value = reader.read()) {
records.add(value);
}
return records;
}
Запись в файл из parquet будет выглядеть примерно так. Хотя это не файл protobuf, это может помочь вам начать работу. Имейте в виду, что у вас возникнут проблемы, если в конечном итоге вы будете использовать spark-stream с protobuf версии v2.6 и выше
public void write(List<Record> records, String location) throws IOException {
Path filePath = new Path(location);
try (ParquetWriter<Record> writer = AvroParquetWriter.<GenericData.Record>builder(filePath)
.withSchema(getSchema()) //
.withConf(getConf()) //
.withCompressionCodec(CompressionCodecName.SNAPPY) //
.withWriteMode(Mode.CREATE) //
.build()) {
for (Record record : records) {
writer.write(record);
}
} catch (Exception e) {
e.printStackTrace();
}
}