Как поместить данные с сервера в поток Kinesis

#java #amazon-web-services #rabbitmq #amazon-kinesis

#java #amazon-веб-сервисы #rabbitmq #amazon-kinesis

Вопрос:

Я новичок в Kinesis. Прочитав документацию, которую я нашел, я могу создать поток Kinesis для получения данных от производителя. Затем использование KCL считывает эти данные из потока для дальнейшей обработки. Я понимаю, как написать приложение KCL, внедрив IRecordProcessor.

Однако самый первый этап, как поместить данные в поток Kinesis, мне все еще не ясен. Есть ли у нас какой-нибудь AWS API, который нуждается в реализации для достижения этой цели?

Сценарии: У меня есть сервер, который постоянно получает данные из разных источников в папках. Каждая папка содержит текстовый файл, строки которого содержат необходимые атрибуты для дальнейшей аналитической работы. я должен отправить все эти данные в поток Kinesis.

Мне нужно что-то закодировать, как показано ниже, чтобы метод putData класса использовался для вывода в потоке Kinesis

 public class Put {

    AmazonKinesisClient kinesisClient;

    Put()
    {
        String accessKey = "My Access Key here" ;
        String secretKey = "My Secret Key here" ;
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
       kinesisClient = new AmazonKinesisClient(credentials);
       kinesisClient.setEndpoint("kinesis.us-east-1.amazonaws.com", "kinesis", "us-east-1");
        System.out.println("starting the Put Application");
    }

    public void putData(String fileContent,String session) throws Exception
    {
         final String myStreamName = "ClickStream";

            PutRecordRequest putRecordRequest = new PutRecordRequest();
            putRecordRequest.setStreamName(myStreamName);
            String putData = fileContent;
            putRecordRequest.setData(ByteBuffer.wrap(putData.getBytes()));
            putRecordRequest.setPartitionKey("session" session);
            PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
            System.out.println("Successfully putrecord, partition key : "   putRecordRequest.getPartitionKey()
                      ", ShardID : "   putRecordResult.getShardId());
            System.out.println(fileContent);
            System.out.println("Sequence Number: " putRecordResult.getSequenceNumber());

            System.out.println("Data has been PUT successfully");


    }
}
  

Однако чтение файла из исходной папки с сервера, а затем какой дизайн я должен использовать для вызова putData для получения записи в потоке Kinesis. Нужен ли мне бесконечный цикл и чтение всех файлов, а затем выполнение этого или какой-либо фреймворк, который лучше сделает это с учетом отказоустойчивости, единой точки отказа для всех. Любая помощь будет принята с благодарностью.

Вкратце: мне нужен лучший метод для передачи регулярно генерируемых данных в поток Kinesis. данные генерируются с регулярным интервалом на сервер. Спасибо

Ответ №1:

Если вы храните некоторые файлы, попробуйте Fluentd. http://www.fluentd.org /

У Amazon Kinesis есть довольно хороший плагин для этого. https://github.com/awslabs/aws-fluent-plugin-kinesis

Ответ №2:

Похоже, вы уже используете… http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html

Конкретный метод, который вы хотите, заключается в следующем.

Вам нужно имя потока, запись и ключ потока. http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/PutRecordResult.html

Но, кажется, у вас все это есть?

Затем вам понадобится программа, которая всегда будет следить за вашим файлом журнала сервера, и когда когда-либо появится новая строка, она будет нажимать это.

Но ваши данные будут храниться только в течение 24 часов. Затем вам понадобится рабочая программа для использования данных и размещения их в каком-либо другом ресурсе AWS.

Комментарии:

1. Да, я переношу данные в S3 из Kinesis. Я искал какое-нибудь готовое решение, чтобы продолжать читать файлы из папки с моего сервера каждый день и помещать все эти данные в Kinesis stream. Ну, на моем сервере у меня есть несколько папок на разные даты, и каждый день содержит много файлов с информацией журнала. я хочу перенести это в поток Kinesis. На этом уровне я думаю, что могу написать простую программу с бесконечным циклом с некоторой задержкой потока, чтобы продолжать считывать события и переходить к Kinesis, если какого-то уже проверенного решения нет. Спасибо

2. Могу ли я использовать RabbitMQ для передачи данных в поток Kinesis. ?

3. Amazon не предоставляет какой-либо готовой программы для отправки. Вы должны создать его самостоятельно. Не знаком с RabbitMQ

4. С Java7 интересная функция WatchService API отлично подходит для решения моих проблем с файлами. Теперь я могу читать файлы как при создании изменении и т. Д. Однако не могли бы вы порекомендовать мне несколько хороших советов по выбору ключа раздела. ?

5. Подумайте о колоде карт, подходящим ключом раздела будет цвет, но лучшим будет масть. Это зависит от ваших данных, но я использовал такие вещи, как EventType, userId, Country StateofRecord. Вы хотите попытаться равномерно распределить свои данные между читателями.

Ответ №3:

Вы можете использовать Amazon kinesis agent для мониторинга набора файлов, и они могут передавать данные в kinesis.

http://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html

Ответ №4:

Если вы пытаетесь использовать файлы журналов, попробуйте Fluentd. Fluentd может непрерывно отслеживать файлы журналов и выполнять буферизацию данных, шифрование, сжатие и повторные попытки.

Плагин Kinesis от Fluentd разработан самой компанией Amazon Web Services.