#amazon-web-services #amazon-s3 #amazon-kinesis
#amazon-веб-сервисы #amazon-s3 #amazon-kinesis
Вопрос:
Каждое приложение Kinesis должно включать эти три компонента:
-
Интерфейс IRecordProcessor
-
Фабрика для класса, который реализует интерфейс IRecordProcessor
-
Код, который инициализирует приложение и создает рабочий
Теперь я понимаю, что как только мы настроим производителя для добавления записей в Kinesis stream . Затем приложение KCL может считывать записи из потока Kinesis с помощью приведенной ниже реализации processRecords. тогда у этого метода processRecords должен быть какой-то способ передать его в S3 для окончательного хранения с библиотекой connector.
общедоступные записи процесса аннулирования (записи списка, контрольная точка IRecordProcessorCheckpointer)
Запрос: Как мне вызвать библиотеку connector из processRecords приложения KCL для хранения записей данных на S3?
Я перешел по ссылке, в которой показан пример приложения Kinesis https://github.com/aws/aws-sdk-java/blob/master/src/samples/AmazonKinesisApplication/SampleRecordProcessor.java
в приведенной выше ссылке я вставляю фрагмент метода, как показано ниже.
private void processRecordsWithRetries(List<Record> records) {
for (Record record : records) {
boolean processedSuccessfully = false;
String data = null;
for (int i = 0; i < NUM_RETRIES; i ) {
try {
// For this app, we interpret the payload as UTF-8 chars.
data = decoder.decode(record.getData()).toString();
LOG.info(record.getSequenceNumber() ", " record.getPartitionKey() ", " data);
//
// Logic to process record goes here.
//
processedSuccessfully = true;
break;
} catch (CharacterCodingException e) {
LOG.error("Malformed data: " data, e);
break;
} catch (Throwable t) {
LOG.warn("Caught throwable while processing record " record, t);
}
// backoff if we encounter an exception.
try {
Thread.sleep(BACKOFF_TIME_IN_MILLIS);
} catch (InterruptedException e) {
LOG.debug("Interrupted sleep", e);
}
}
if (!processedSuccessfully) {
LOG.error("Couldn't process record " record ". Skipping the record.");
}
}
}
В приведенном выше коде, когда мы говорим «Логика обработки записи идет сюда». (пожалуйста, смотрите Код выше)
Здесь мое требование — поместить данные на s3. Я понимаю, что у нас есть библиотека коннекторов, которая делает это, однако я не могу представить, как вызвать библиотеку коннекторов на этом этапе и далее?пожалуйста, предложите
Ответ №1:
Вы должны попробовать библиотеку kinesis connector, в ней есть образец того, что вам нужно: https //github.com/awslabs/amazon-kinesis-connectors.