Запись паркета в буфер или поток байтов

#java #bufferedreader #parquet

#java #bufferedreader #паркет

Вопрос:

У меня есть Java-приложение, которое преобразует сообщения json в формат parquet. Есть ли какой-либо parquet writer, который записывает в буфер или поток байтов в java? Большинство примеров, которые я видел, записываются в файлы.

Ответ №1:

TLDR; вам нужно будет реализовать OutputFile , например, что-то вроде:

 import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;

import java.io.BufferedOutputStream;
import java.io.IOException;

public class ParquetBufferedWriter implements OutputFile {

    private final BufferedOutputStream out;

    public ParquetBufferedWriter(BufferedOutputStream out) {
        this.out = out;
    }

    @Override
    public PositionOutputStream create(long blockSizeHint) throws IOException {
        return createPositionOutputstream();
    }

    private PositionOutputStream createPositionOutputstream() {
        return new PositionOutputStream() {
            @Override
            public long getPos() throws IOException {
                return 0;
            }

            @Override
            public void write(int b) throws IOException {
                out.write(b);
            }
        };
    }

    @Override
    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
        return createPositionOutputstream();
    }

    @Override
    public boolean supportsBlockSize() {
        return false;
    }

    @Override
    public long defaultBlockSize() {
        return 0;
    }

}
 

И ваш writer будет чем-то вроде:

     ParquetBufferedWriter out = new ParquetBufferedWriter();
        try (ParquetWriter<Record> writer = AvroParquetWriter.
                <Record>builder(out)
                .withRowGroupSize(DEFAULT_BLOCK_SIZE)
                .withPageSize(DEFAULT_PAGE_SIZE)
                .withSchema(SCHEMA)
                .build()) {

            for (Record record : records) {
                writer.write(record);
            }
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
 

Комментарии:

1. позиция (getPos()) не должна быть 0, иначе вывод прерывается. Правильный ответ: @breadcrumb42 с подсчетом байтов для позиции. Другим решением может быть использование: CountingOutpustream и установка позиции на основе позиции OutputStream.

Ответ №2:

Мне просто также нужно было записать в поток, поэтому я выполнил пример, приведенный наймджоном. Для меня отлично работает следующее.

 class ParquetBufferedWriter implements OutputFile {
    
    private final BufferedOutputStream out;

    public ParquetBufferedWriter(BufferedOutputStream out) {
        this.out = out;
    }

    @Override
    public PositionOutputStream create(long blockSizeHint) throws IOException {
        return createPositionOutputstream();
    }

    private PositionOutputStream createPositionOutputstream() {
        return new PositionOutputStream() {
            
            int pos = 0;

            @Override
            public long getPos() throws IOException {
                return pos;
            }

            @Override
            public void flush() throws IOException {
                out.flush();
            };

            @Override
            public void close() throws IOException {
                out.close();
            };

            @Override
            public void write(int b) throws IOException {
                out.write(b);
                pos  ;
            }

            @Override
            public void write(byte[] b, int off, int len) throws IOException {
                out.write(b, off, len);
                pos  = len;
            }
        };
    }

    @Override
    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
        return createPositionOutputstream();
    }

    @Override
    public boolean supportsBlockSize() {
        return false;
    }

    @Override
    public long defaultBlockSize() {
        return 0;
    }
}
 

Ответ №3:

Вам нужно записать данные во временный файл, а затем скрыть данные из файла во входной поток или буфер примерно так, сначала прочитайте данные временного файла

 final InputStream targetStream = new DataInputStream(new FileInputStream(tmp1.getAbsoluteFile()));

StringWriter writer = new StringWriter();
String encoding = StandardCharsets.UTF_8.name();
IOUtils.copy(targetStream, writer, encoding);
System.out.println(writer);