Как объединить потоки данных Kinesis в один для Kinesis data analytics?

#amazon-web-services #amazon-kinesis #amazon-kinesis-firehose #amazon-kinesis-analytics

#amazon-веб-сервисы #amazon-kinesis #amazon-kinesis-firehose #amazon-kinesis-analytics

Вопрос:

У меня есть несколько потоков данных AWS kinesis / firehose со структурированными данными в формате CSV. Мне нужно выполнить анализ этих данных с помощью kinesis data analytics. Но как я могу объединить несколько потоков в один? Потому что Kinesis data Analytics получает данные только из одного потока. Потоки данных могут существовать в разных регионах.

Проблема: Как объединить потоки данных Kinesis в один для Kinesis data analytics?

Ответ №1:

Я не знаю, есть ли какие-либо готовые продукты AWS, которые вы можете использовать для этого, но это довольно просто, если вы не против написать немного кода.

  1. Создайте поток kinesis, который будет «объединенным потоком» (события обоих ваших исходных потоков будут отправляться сюда.)
  2. Создайте лямбда-выражение, используя выбранный вами язык программирования, и установите триггеры для потоков kinesis, которые вы хотите объединить.
  3. Закодируйте лямбда-выражение для записи всех событий, которые оно получает, в поток, созданный на шаге 1.

Результирующий поток kinesis должен содержать объединенные данные, которые вы ищете, и вы можете использовать их для загрузки в аналитику.

Ответ №2:

Это поздний ответ, но обновить его для полноты

Вы также можете сделать это с помощью Kinesis Data Analytics для Apache flink. https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-it-works.html. Это управляемый сервис Apache Flink от AWS, если вы не возражаете написать немного кода на языке Java / Python.

Вы можете использовать Studio notebook, если вы изучаете потоковые данные, то есть на этапе разработки. https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html

Отказ от ответственности: я работаю в команде Amazon Kinesis

Ответ №3:

Недавно я внедрил решение, способное объединять несколько наборов потоковых данных, и столкнулся с той же проблемой, о которой вы говорили в своем вопросе.

Действительно, KDA в приложении принимает только один поток в качестве источника входных данных; таким образом, это ограничение делает стандартизацию схемы данных, поступающих в KDA, необходимой, когда вы имеете дело с несколькими наборами потоков. Чтобы обойти эти проблемы, фрагмент кода python можно использовать внутри lambda для сглаживания и стандартизации любого события путем преобразования всей его полезной нагрузки в строку, закодированную в формате JSON. Затем этот лямбда-код отправляет сглаженные события в поток данных Kinesis. Изображение ниже иллюстрирует этот процесс: введите описание изображения здесь

Обратите внимание, что после этого этапа оба события JSON имеют одинаковую схему и не имеют вложенных полей. Тем не менее, вся информация сохраняется. Кроме того, поле ssn помещается в заголовок для последующего использования в качестве ключа объединения.

Я написал подробное объяснение этого решения здесь: https://medium.com/@guilhermeepassos/joining-and-enriching-multiple-sets-of-streaming-data-with-kinesis-data-analytics-24b4088b5846

Я надеюсь, что это может помочь!!!