Как приложение KCL интегрируется в Kinesis Connector для передачи данных на S3

#amazon-web-services #amazon-s3 #amazon-kinesis

#amazon-веб-сервисы #amazon-s3 #amazon-kinesis

Вопрос:

Каждое приложение Kinesis должно включать эти три компонента:

  1. Интерфейс IRecordProcessor

  2. Фабрика для класса, который реализует интерфейс IRecordProcessor

  3. Код, который инициализирует приложение и создает рабочий

Теперь я понимаю, что как только мы настроим производителя для добавления записей в Kinesis stream . Затем приложение KCL может считывать записи из потока Kinesis с помощью приведенной ниже реализации processRecords. тогда у этого метода processRecords должен быть какой-то способ передать его в S3 для окончательного хранения с библиотекой connector.

общедоступные записи процесса аннулирования (записи списка, контрольная точка IRecordProcessorCheckpointer)

Запрос: Как мне вызвать библиотеку connector из processRecords приложения KCL для хранения записей данных на S3?

Я перешел по ссылке, в которой показан пример приложения Kinesis https://github.com/aws/aws-sdk-java/blob/master/src/samples/AmazonKinesisApplication/SampleRecordProcessor.java

в приведенной выше ссылке я вставляю фрагмент метода, как показано ниже.

 private void processRecordsWithRetries(List<Record> records) {
        for (Record record : records) {
            boolean processedSuccessfully = false;
            String data = null;
            for (int i = 0; i < NUM_RETRIES; i  ) {
                try {
                    // For this app, we interpret the payload as UTF-8 chars.
                    data = decoder.decode(record.getData()).toString();
                    LOG.info(record.getSequenceNumber()   ", "   record.getPartitionKey()   ", "   data);
            //
                    // Logic to process record goes here.
                    //
                    processedSuccessfully = true;
                    break;
                } catch (CharacterCodingException e) {
                    LOG.error("Malformed data: "   data, e);
                    break;
                } catch (Throwable t) {
                    LOG.warn("Caught throwable while processing record "   record, t);
                }

                // backoff if we encounter an exception.
                try {
                    Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                } catch (InterruptedException e) {
                    LOG.debug("Interrupted sleep", e);
                }
            }

            if (!processedSuccessfully) {
                LOG.error("Couldn't process record "   record   ". Skipping the record.");
            }
        }
    }
  

В приведенном выше коде, когда мы говорим «Логика обработки записи идет сюда». (пожалуйста, смотрите Код выше)
Здесь мое требование — поместить данные на s3. Я понимаю, что у нас есть библиотека коннекторов, которая делает это, однако я не могу представить, как вызвать библиотеку коннекторов на этом этапе и далее?пожалуйста, предложите

Ответ №1: