Aws S3 Select ScanRange работает не так, как ожидалось, и предоставляет результат всего файла

#amazon-s3 #aws-java-sdk #amazon-s3-select

#amazon-s3 #aws-java-sdk #amazon-s3-select

Вопрос:

Я пытаюсь отсканировать и получить фрагмент данных из моего объекта в S3 с помощью S3 select с использованием Java SDK. Я использую scanRange class для предоставления start end диапазона байтов и объекта S3. Объект s3 представляет собой файл parquet размером 9 МБ и имеет несжатый размер 84 МБ.

Получая объект s3 без scanRange опции SelectObjectContentRequest , я получаю файл размером 84 МБ в качестве eexpectd.

Но при использовании scanRange even then я получаю весь файл размером 84 МБ, независимо от start предоставленных мной байтов or end .

Ниже приведен фрагмент кода, который я использую:

 public class S3SelectParquetParser {
  public Object getS3SelectResponse(AmazonS3 s3Client, S3SelectParserRequest s3SelectParserRequest) throws IOException {
    SelectObjectContentRequest selectRequest = gets3SelectRequestObject(s3SelectParserRequest);
    SelectObjectContentResult result = s3Client.selectObjectContent(selectRequest);
    return massageResult(result.getPayload(), s3SelectParserRequest.getOutputFile());
  }

  private SelectObjectContentRequest gets3SelectRequestObject(S3SelectParserRequest s3SelectParserRequest) {
    SelectObjectContentRequest requestObject = new SelectObjectContentRequest();

    InputSerialization inputSerialization = new InputSerialization()
        .withParquet(new ParquetInput())
        .withCompressionType(CompressionType.NONE);

    OutputSerialization outputSerialization = new OutputSerialization()
        .withJson(new JSONOutput());

    RequestProgress requestProgress = new RequestProgress();
    if (s3SelectParserRequest.isShowProgress()) {
      requestProgress.setEnabled(true);
    } else {
      requestProgress.setEnabled(false);
    }

    requestObject.setBucketName(s3SelectParserRequest.getBucketName());
    requestObject.setKey(s3SelectParserRequest.getObjectPath());
    requestObject.setInputSerialization(inputSerialization);
    requestObject.setOutputSerialization(outputSerialization);
    requestObject.setExpressionType(ExpressionType.SQL);
    requestObject.setExpression(s3SelectParserRequest.getQuery());
    requestObject.setRequestProgress(requestProgress);

    if (s3SelectParserRequest.isRangedRequest()) {
      ScanRange scanRange = new ScanRange();
      scanRange.setStart(s3SelectParserRequest.getStartByteRange());
      scanRange.setEnd(s3SelectParserRequest.getEndByteRange());
      requestObject.setScanRange(scanRange);
    }
    return requestObject;
  }


  private Object massageResult(SelectObjectContentEventStream payload, String outputFile) throws IOException {
    log.info("Starting reading the inputStream");
    final AtomicBoolean isResultComplete = new AtomicBoolean(false);
    OutputStream outputStream = null;
    try {
      outputStream = new FileOutputStream(outputFile);
    } catch (Exception e) {
      log.error("Exception occurred", e);
    }
    InputStream resultInputStream = payload.getRecordsInputStream(
        new SelectObjectContentEventVisitor() {
          @Override
          public void visit(SelectObjectContentEvent.StatsEvent event) {
            System.out.println(
                "Received Stats, Bytes Scanned: "   event.getDetails().getBytesScanned()
                      " Bytes Processed: "   event.getDetails().getBytesProcessed());
          }

          /*
           * An End Event informs that the request has finished successfully.
           */
          @Override
          public void visit(SelectObjectContentEvent.EndEvent event) {
            isResultComplete.set(true);
            System.out.println("Received End Event. Result is complete.");
          }

          @Override
          public void visit(ProgressEvent event) {
            log.info("{Progress}: Bytes processed: %s t Bytes received: %s", event.getDetails().getBytesProcessed(),
                event.getDetails().getBytesReturned());
          }

        }
    );
    copy(resultInputStream, outputStream);
    return resultInputStream;
  }
}
 

Где S3SelectParserRequest инициализируется как:

   S3SelectParserRequest s3SelectRequestRanged = S3SelectParserRequest.builder()
        .bucketName(bucketName)
        .objectPath(bucketPath)
        .query(fullQuery)
        .showProgress(false)
        .isRangedRequest(true)
        .startByteRange(0)
        .endByteRange(4098)
        .outputFile("ranged-0-400.json")
        .build();
 

и getS3SelectResponse вызывается как:

 new S3SelectParquetParser().getS3SelectResponse(s3Client, s3SelectRequestRanged);
 

Ответ №1:

Я понял это. S3 select предоставляет данные для всех групп строк, которые содержатся в предоставленном байте scanRange для объекта parquet. Начальные 4 байта содержат заголовок, а пятый байт и далее объекта parquet содержит первую группу строк. Поскольку моя первая группа строк была после 0-го байта (предоставлено startByterange мной), независимо от того, что endByteRange я предоставляю после 4, она вернет первую группу строк (а также другие, если они начинаются в указанном мной диапазоне). Мой файл parquet содержал только одну группу строк, поэтому независимо от того, какой диапазон данных предоставлен, он предоставлял весь файл, то есть первую группу строк.