#amazon-web-services #amazon-kinesis #amazon-kinesis-firehose #amazon-kinesis-analytics
#amazon-веб-сервисы #amazon-kinesis #amazon-kinesis-firehose #amazon-kinesis-analytics
Вопрос:
У меня есть несколько потоков данных AWS kinesis / firehose со структурированными данными в формате CSV. Мне нужно выполнить анализ этих данных с помощью kinesis data analytics. Но как я могу объединить несколько потоков в один? Потому что Kinesis data Analytics получает данные только из одного потока. Потоки данных могут существовать в разных регионах.
Проблема: Как объединить потоки данных Kinesis в один для Kinesis data analytics?
Ответ №1:
Я не знаю, есть ли какие-либо готовые продукты AWS, которые вы можете использовать для этого, но это довольно просто, если вы не против написать немного кода.
- Создайте поток kinesis, который будет «объединенным потоком» (события обоих ваших исходных потоков будут отправляться сюда.)
- Создайте лямбда-выражение, используя выбранный вами язык программирования, и установите триггеры для потоков kinesis, которые вы хотите объединить.
- Закодируйте лямбда-выражение для записи всех событий, которые оно получает, в поток, созданный на шаге 1.
Результирующий поток kinesis должен содержать объединенные данные, которые вы ищете, и вы можете использовать их для загрузки в аналитику.
Ответ №2:
Это поздний ответ, но обновить его для полноты
Вы также можете сделать это с помощью Kinesis Data Analytics для Apache flink. https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-it-works.html. Это управляемый сервис Apache Flink от AWS, если вы не возражаете написать немного кода на языке Java / Python.
Вы можете использовать Studio notebook, если вы изучаете потоковые данные, то есть на этапе разработки. https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html
Отказ от ответственности: я работаю в команде Amazon Kinesis
Ответ №3:
Недавно я внедрил решение, способное объединять несколько наборов потоковых данных, и столкнулся с той же проблемой, о которой вы говорили в своем вопросе.
Действительно, KDA в приложении принимает только один поток в качестве источника входных данных; таким образом, это ограничение делает стандартизацию схемы данных, поступающих в KDA, необходимой, когда вы имеете дело с несколькими наборами потоков. Чтобы обойти эти проблемы, фрагмент кода python можно использовать внутри lambda для сглаживания и стандартизации любого события путем преобразования всей его полезной нагрузки в строку, закодированную в формате JSON. Затем этот лямбда-код отправляет сглаженные события в поток данных Kinesis. Изображение ниже иллюстрирует этот процесс:
Обратите внимание, что после этого этапа оба события JSON имеют одинаковую схему и не имеют вложенных полей. Тем не менее, вся информация сохраняется. Кроме того, поле ssn помещается в заголовок для последующего использования в качестве ключа объединения.
Я написал подробное объяснение этого решения здесь: https://medium.com/@guilhermeepassos/joining-and-enriching-multiple-sets-of-streaming-data-with-kinesis-data-analytics-24b4088b5846
Я надеюсь, что это может помочь!!!