#java #bufferedreader #parquet
#java #bufferedreader #паркет
Вопрос:
У меня есть Java-приложение, которое преобразует сообщения json в формат parquet. Есть ли какой-либо parquet writer, который записывает в буфер или поток байтов в java? Большинство примеров, которые я видел, записываются в файлы.
Ответ №1:
TLDR; вам нужно будет реализовать OutputFile
, например, что-то вроде:
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
public class ParquetBufferedWriter implements OutputFile {
private final BufferedOutputStream out;
public ParquetBufferedWriter(BufferedOutputStream out) {
this.out = out;
}
@Override
public PositionOutputStream create(long blockSizeHint) throws IOException {
return createPositionOutputstream();
}
private PositionOutputStream createPositionOutputstream() {
return new PositionOutputStream() {
@Override
public long getPos() throws IOException {
return 0;
}
@Override
public void write(int b) throws IOException {
out.write(b);
}
};
}
@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
return createPositionOutputstream();
}
@Override
public boolean supportsBlockSize() {
return false;
}
@Override
public long defaultBlockSize() {
return 0;
}
}
И ваш writer будет чем-то вроде:
ParquetBufferedWriter out = new ParquetBufferedWriter();
try (ParquetWriter<Record> writer = AvroParquetWriter.
<Record>builder(out)
.withRowGroupSize(DEFAULT_BLOCK_SIZE)
.withPageSize(DEFAULT_PAGE_SIZE)
.withSchema(SCHEMA)
.build()) {
for (Record record : records) {
writer.write(record);
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
Комментарии:
1. позиция (getPos()) не должна быть 0, иначе вывод прерывается. Правильный ответ: @breadcrumb42 с подсчетом байтов для позиции. Другим решением может быть использование: CountingOutpustream и установка позиции на основе позиции OutputStream.
Ответ №2:
Мне просто также нужно было записать в поток, поэтому я выполнил пример, приведенный наймджоном. Для меня отлично работает следующее.
class ParquetBufferedWriter implements OutputFile {
private final BufferedOutputStream out;
public ParquetBufferedWriter(BufferedOutputStream out) {
this.out = out;
}
@Override
public PositionOutputStream create(long blockSizeHint) throws IOException {
return createPositionOutputstream();
}
private PositionOutputStream createPositionOutputstream() {
return new PositionOutputStream() {
int pos = 0;
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public void flush() throws IOException {
out.flush();
};
@Override
public void close() throws IOException {
out.close();
};
@Override
public void write(int b) throws IOException {
out.write(b);
pos ;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
pos = len;
}
};
}
@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
return createPositionOutputstream();
}
@Override
public boolean supportsBlockSize() {
return false;
}
@Override
public long defaultBlockSize() {
return 0;
}
}
Ответ №3:
Вам нужно записать данные во временный файл, а затем скрыть данные из файла во входной поток или буфер примерно так, сначала прочитайте данные временного файла
final InputStream targetStream = new DataInputStream(new FileInputStream(tmp1.getAbsoluteFile()));
StringWriter writer = new StringWriter();
String encoding = StandardCharsets.UTF_8.name();
IOUtils.copy(targetStream, writer, encoding);
System.out.println(writer);