#java #amazon-web-services #rabbitmq #amazon-kinesis
#java #amazon-веб-сервисы #rabbitmq #amazon-kinesis
Вопрос:
Я новичок в Kinesis. Прочитав документацию, которую я нашел, я могу создать поток Kinesis для получения данных от производителя. Затем использование KCL считывает эти данные из потока для дальнейшей обработки. Я понимаю, как написать приложение KCL, внедрив IRecordProcessor.
Однако самый первый этап, как поместить данные в поток Kinesis, мне все еще не ясен. Есть ли у нас какой-нибудь AWS API, который нуждается в реализации для достижения этой цели?
Сценарии: У меня есть сервер, который постоянно получает данные из разных источников в папках. Каждая папка содержит текстовый файл, строки которого содержат необходимые атрибуты для дальнейшей аналитической работы. я должен отправить все эти данные в поток Kinesis.
Мне нужно что-то закодировать, как показано ниже, чтобы метод putData класса использовался для вывода в потоке Kinesis
public class Put {
AmazonKinesisClient kinesisClient;
Put()
{
String accessKey = "My Access Key here" ;
String secretKey = "My Secret Key here" ;
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
kinesisClient = new AmazonKinesisClient(credentials);
kinesisClient.setEndpoint("kinesis.us-east-1.amazonaws.com", "kinesis", "us-east-1");
System.out.println("starting the Put Application");
}
public void putData(String fileContent,String session) throws Exception
{
final String myStreamName = "ClickStream";
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
String putData = fileContent;
putRecordRequest.setData(ByteBuffer.wrap(putData.getBytes()));
putRecordRequest.setPartitionKey("session" session);
PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
System.out.println("Successfully putrecord, partition key : " putRecordRequest.getPartitionKey()
", ShardID : " putRecordResult.getShardId());
System.out.println(fileContent);
System.out.println("Sequence Number: " putRecordResult.getSequenceNumber());
System.out.println("Data has been PUT successfully");
}
}
Однако чтение файла из исходной папки с сервера, а затем какой дизайн я должен использовать для вызова putData для получения записи в потоке Kinesis. Нужен ли мне бесконечный цикл и чтение всех файлов, а затем выполнение этого или какой-либо фреймворк, который лучше сделает это с учетом отказоустойчивости, единой точки отказа для всех. Любая помощь будет принята с благодарностью.
Вкратце: мне нужен лучший метод для передачи регулярно генерируемых данных в поток Kinesis. данные генерируются с регулярным интервалом на сервер. Спасибо
Ответ №1:
Если вы храните некоторые файлы, попробуйте Fluentd. http://www.fluentd.org /
У Amazon Kinesis есть довольно хороший плагин для этого. https://github.com/awslabs/aws-fluent-plugin-kinesis
Ответ №2:
Похоже, вы уже используете… http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html
Конкретный метод, который вы хотите, заключается в следующем.
Вам нужно имя потока, запись и ключ потока. http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/PutRecordResult.html
Но, кажется, у вас все это есть?
Затем вам понадобится программа, которая всегда будет следить за вашим файлом журнала сервера, и когда когда-либо появится новая строка, она будет нажимать это.
Но ваши данные будут храниться только в течение 24 часов. Затем вам понадобится рабочая программа для использования данных и размещения их в каком-либо другом ресурсе AWS.
Комментарии:
1. Да, я переношу данные в S3 из Kinesis. Я искал какое-нибудь готовое решение, чтобы продолжать читать файлы из папки с моего сервера каждый день и помещать все эти данные в Kinesis stream. Ну, на моем сервере у меня есть несколько папок на разные даты, и каждый день содержит много файлов с информацией журнала. я хочу перенести это в поток Kinesis. На этом уровне я думаю, что могу написать простую программу с бесконечным циклом с некоторой задержкой потока, чтобы продолжать считывать события и переходить к Kinesis, если какого-то уже проверенного решения нет. Спасибо
2. Могу ли я использовать RabbitMQ для передачи данных в поток Kinesis. ?
3. Amazon не предоставляет какой-либо готовой программы для отправки. Вы должны создать его самостоятельно. Не знаком с RabbitMQ
4. С Java7 интересная функция WatchService API отлично подходит для решения моих проблем с файлами. Теперь я могу читать файлы как при создании изменении и т. Д. Однако не могли бы вы порекомендовать мне несколько хороших советов по выбору ключа раздела. ?
5. Подумайте о колоде карт, подходящим ключом раздела будет цвет, но лучшим будет масть. Это зависит от ваших данных, но я использовал такие вещи, как EventType, userId, Country StateofRecord. Вы хотите попытаться равномерно распределить свои данные между читателями.
Ответ №3:
Вы можете использовать Amazon kinesis agent для мониторинга набора файлов, и они могут передавать данные в kinesis.
http://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html
Ответ №4:
Если вы пытаетесь использовать файлы журналов, попробуйте Fluentd. Fluentd может непрерывно отслеживать файлы журналов и выполнять буферизацию данных, шифрование, сжатие и повторные попытки.
Плагин Kinesis от Fluentd разработан самой компанией Amazon Web Services.