Объединение сообщений SQS, которые поступают с интервалом в миллисекунды друг от друга

#node.js #amazon-web-services #aws-lambda #amazon-sqs #amazon-sns

#node.js #amazon-веб-сервисы #aws-lambda #amazon-sqs #amazon-sns

Вопрос:

Я столкнулся с ситуацией, которую я не совсем уверен, как решить. По сути, моя система получает данные из стороннего источника через API gateway, публикует эти данные в разделе SNS, который запускает лямбда-функцию. На основе параметров сообщения лямбда-функция отправляет сообщение в одну из трех разных очередей SQS. Эти очереди запускают одну из трех лямбда-функций, которые выполняют одно из трех возможных действий — создают, обновляют или удаляют элементы в этом порядке в другой сторонней системе через свои конечные точки API. Обычный процесс состоял бы в том, чтобы сначала создать объект в целевой системе, а затем каждое последующее действие должно заключаться в обновлении / удалении этого объекта. Проблема в том, что иногда я получаю данные для одного и того же объекта из источника в течение миллисекунд, поэтому моя система не может создать объект в пункте назначения из-за того, что их API требует для этого не менее 300-400 мс. Поэтому, когда моя система пытается обновить объект, он еще не существует, поэтому моя система создает его. Но поскольку у меня есть действие create в процессе выполнения, оно создает дублирующуюся запись в моем пункте назначения.

Итак, мой вопрос в том, как лучше всего консолидировать сообщения для одного и того же объекта, которые поступают менее чем за секунду друг от друга?

Мои мысли на данный момент: я подумываю об использовании redis для объединения сообщений, предназначенных для одного и того же объекта, прежде чем отправлять их в раздел SNS, но я надеялся, что будет более прямой подход, поскольку я не хочу вводить еще один уровень логики. Любая помощь будет высоко оценена. Спасибо.

Ответ №1:

Лучшим вариантом было бы использовать очередь Amazon SQS FIFO, где для каждого сообщения используется идентификатор группы сообщений, который устанавливается на уникальный идентификатор создаваемого элемента.

В очереди FIFO SQS гарантирует, что сообщения обрабатываются по порядку, и разрешает получать только одно сообщение для каждого идентификатора группы сообщений одновременно. Таким образом, любые последующие сообщения для одного и того же идентификатора группы сообщений будут ждать, пока существующее сообщение не будет полностью обработано.

Если это неприемлемо, то AWS Lambda теперь поддерживает пакетные окна продолжительностью до 5 минут для функций с Amazon SQS в качестве источника событий:

AWS Lambda теперь позволяет клиентам, использующим Amazon Simple Queue Service (Amazon SQS) в качестве источника событий, определять вызываемый период ожидания MaximumBatchingWindowInSeconds , чтобы сообщения накапливались в их очереди SQS перед вызовом лямбда-функции. В дополнение к размеру пакета, это второй вариант отправки записей в пакетах, чтобы уменьшить количество вызовов Lambda. Этот вариант идеально подходит для рабочих нагрузок, которые не зависят от времени, и могут подождать для оптимизации затрат.

Ранее лямбда-функции, опрашивающие из очереди SQS, отправляли сообщения пакетами до 10 перед вызовом функции. Теперь клиенты также могут определить временное окно, которое Lambda должна ожидать для опроса сообщений из своей очереди SQS, прежде чем вызывать свою функцию. Lambda будет ждать до 300 секунд для опроса сообщений из очереди SQS. Когда определено окно пакетной обработки, Lambda также позволит клиентам определять размер пакета до 10 000 сообщений.

Для начала, при создании новой лямбда-функции или обновлении существующей функции с использованием SQS в качестве источника событий, клиенты могут установить MaximumBatchingWindowInSeconds для поля любое значение в диапазоне от 0 до 300 секунд в консоли управления AWS, AWS CLI, AWS SAM или AWS SDK для Lambda. Эта функция доступна во всех регионах AWS, где доступны AWS Lambda и Amazon SQS, и не требует дополнительной платы за использование.

Комментарии:

1. Спасибо за ответ, Джон, однако я не уверен, как я мог бы применить это, поскольку проблема возникает между темой SNS и очередями SQS. Если я получу событие create, а через 30 мс событие update, лямбда-выражение отправит событие create в очередь create, а затем проверит, существует ли объект, он поймет, что это не так, и отправит его также в очередь create. Итак, в итоге я получаю два сообщения в очереди создания

2. Ммм. Является ли задержка критической? Не могли бы вы ограничить параллелизм центральной лямбда-функции так, чтобы она выполнялась только по одному разу, чтобы избежать дублирования операций?

3. Это так, мне нужно обрабатывать около 5 тыс. операций в минуту, и только у небольшой части есть проблема, описанная выше.

Ответ №2:

функция lambda отправляет сообщение в одну из трех разных очередей SQS

Поэтому, когда моя система пытается обновить объект, он еще не существует, поэтому моя система создает его. Но поскольку в процессе выполнения у меня есть действие create, оно создает дублирующуюся запись в моем пункте назначения

Используя множественную очередь, вы создали себе гонку потоков, и теперь пытаетесь ее исправить.

На основе предоставленной информации и контекста — как уже было сказано — более подходящей может быть одна очередь fifo с идентификатором контекста (вам действительно нужны 3 очереди?) Если задержка критична, то потоковая передача также может быть решением.

Как вы описали свою проблему, я думаю, вам не нужно объединять сообщения (действительно, вы могли бы использовать Redis, AWS Kinesis Analytics, DynamoDB ..), А не создавать проблему в первую очередь

Опции

  • наличие единой очереди fifo
  • наличие идемпотентной и потокобезопасной серверной службы, способной обрабатывать одновременные обновления (транзакции, атомарные обновления, ..)

Кроме того, если вы можете создавать «повторяющиеся» записи, это означает, что уникальные индексы не применяются. Они существуют именно по этой причине.

Вы не указали серверную службу (СУБД, DynamoDB, MongoDB, другие?), У каждого есть возможность как-то решить проблему.

Комментарии:

1. Это не вариант, используя одну очередь FIFO. Мое приложение, по сути, является связующим звеном между двумя сторонними платформами. Первый предоставляет данные в заранее определенном формате, я преобразую данные, принимаю некоторые решения на основе данных, а затем отправляю их в целевое стороннее приложение через REST API. Одиночная очередь FIFO — это не вариант, поскольку это частично устранит мою проблему, но замедлит мою пропускную способность, которая должна составлять примерно 5 тыс. событий в минуту.