#amazon-s3 #aws-java-sdk #amazon-s3-select
#amazon-s3 #aws-java-sdk #amazon-s3-select
Вопрос:
Я пытаюсь отсканировать и получить фрагмент данных из моего объекта в S3 с помощью S3 select с использованием Java SDK. Я использую scanRange
class для предоставления start
end
диапазона байтов и объекта S3. Объект s3 представляет собой файл parquet размером 9 МБ и имеет несжатый размер 84 МБ.
Получая объект s3 без scanRange
опции SelectObjectContentRequest
, я получаю файл размером 84 МБ в качестве eexpectd.
Но при использовании scanRange
even then я получаю весь файл размером 84 МБ, независимо от start
предоставленных мной байтов or end
.
Ниже приведен фрагмент кода, который я использую:
public class S3SelectParquetParser {
public Object getS3SelectResponse(AmazonS3 s3Client, S3SelectParserRequest s3SelectParserRequest) throws IOException {
SelectObjectContentRequest selectRequest = gets3SelectRequestObject(s3SelectParserRequest);
SelectObjectContentResult result = s3Client.selectObjectContent(selectRequest);
return massageResult(result.getPayload(), s3SelectParserRequest.getOutputFile());
}
private SelectObjectContentRequest gets3SelectRequestObject(S3SelectParserRequest s3SelectParserRequest) {
SelectObjectContentRequest requestObject = new SelectObjectContentRequest();
InputSerialization inputSerialization = new InputSerialization()
.withParquet(new ParquetInput())
.withCompressionType(CompressionType.NONE);
OutputSerialization outputSerialization = new OutputSerialization()
.withJson(new JSONOutput());
RequestProgress requestProgress = new RequestProgress();
if (s3SelectParserRequest.isShowProgress()) {
requestProgress.setEnabled(true);
} else {
requestProgress.setEnabled(false);
}
requestObject.setBucketName(s3SelectParserRequest.getBucketName());
requestObject.setKey(s3SelectParserRequest.getObjectPath());
requestObject.setInputSerialization(inputSerialization);
requestObject.setOutputSerialization(outputSerialization);
requestObject.setExpressionType(ExpressionType.SQL);
requestObject.setExpression(s3SelectParserRequest.getQuery());
requestObject.setRequestProgress(requestProgress);
if (s3SelectParserRequest.isRangedRequest()) {
ScanRange scanRange = new ScanRange();
scanRange.setStart(s3SelectParserRequest.getStartByteRange());
scanRange.setEnd(s3SelectParserRequest.getEndByteRange());
requestObject.setScanRange(scanRange);
}
return requestObject;
}
private Object massageResult(SelectObjectContentEventStream payload, String outputFile) throws IOException {
log.info("Starting reading the inputStream");
final AtomicBoolean isResultComplete = new AtomicBoolean(false);
OutputStream outputStream = null;
try {
outputStream = new FileOutputStream(outputFile);
} catch (Exception e) {
log.error("Exception occurred", e);
}
InputStream resultInputStream = payload.getRecordsInputStream(
new SelectObjectContentEventVisitor() {
@Override
public void visit(SelectObjectContentEvent.StatsEvent event) {
System.out.println(
"Received Stats, Bytes Scanned: " event.getDetails().getBytesScanned()
" Bytes Processed: " event.getDetails().getBytesProcessed());
}
/*
* An End Event informs that the request has finished successfully.
*/
@Override
public void visit(SelectObjectContentEvent.EndEvent event) {
isResultComplete.set(true);
System.out.println("Received End Event. Result is complete.");
}
@Override
public void visit(ProgressEvent event) {
log.info("{Progress}: Bytes processed: %s t Bytes received: %s", event.getDetails().getBytesProcessed(),
event.getDetails().getBytesReturned());
}
}
);
copy(resultInputStream, outputStream);
return resultInputStream;
}
}
Где S3SelectParserRequest
инициализируется как:
S3SelectParserRequest s3SelectRequestRanged = S3SelectParserRequest.builder()
.bucketName(bucketName)
.objectPath(bucketPath)
.query(fullQuery)
.showProgress(false)
.isRangedRequest(true)
.startByteRange(0)
.endByteRange(4098)
.outputFile("ranged-0-400.json")
.build();
и getS3SelectResponse
вызывается как:
new S3SelectParquetParser().getS3SelectResponse(s3Client, s3SelectRequestRanged);
Ответ №1:
Я понял это. S3 select предоставляет данные для всех групп строк, которые содержатся в предоставленном байте scanRange
для объекта parquet. Начальные 4 байта содержат заголовок, а пятый байт и далее объекта parquet содержит первую группу строк. Поскольку моя первая группа строк была после 0-го байта (предоставлено startByterange
мной), независимо от того, что endByteRange
я предоставляю после 4, она вернет первую группу строк (а также другие, если они начинаются в указанном мной диапазоне). Мой файл parquet содержал только одну группу строк, поэтому независимо от того, какой диапазон данных предоставлен, он предоставлял весь файл, то есть первую группу строк.