Настройка AWS Lambda для параллельных вычислений для потоков Dynamodb

#amazon-web-services #aws-lambda #amazon-dynamodb

#amazon-веб-сервисы #aws-lambda #amazon-dynamodb

Вопрос:

У меня есть flask информация EC2 об python 3.6 AWS Lambda архитектуре. Когда приходит ответ на flask , в dynamoDB добавляется новый элемент, который запускает Lambda некоторый процесс с новым добавленным элементом. По какой-то странной причине он не обрабатывает триггеры параллельно, запуская новую лямбда-функцию для каждого триггера, а обрабатывает их один за другим.

Я попытался установить concurrency ограничение на максимальное значение, но это не сработало.

Мне нужно получить результат как можно быстрее и не управлять каким-либо процессом масштабирования самостоятельно. Таким образом, триггеры должны обрабатываться параллельно, а не по одному, как сейчас.

Комментарии:

1. Как ваши данные распределяются в dynamodb? Похоже, проблема с разделением.

Ответ №1:

  1. Если вы разрабатываете лямбда-функцию с помощью Python, параллелизм по умолчанию отсутствует. Lambda поддерживает Python 2.7 и Python 3.6, оба из которых имеют модули многопроцессорной обработки и многопоточности.
  2. С другой стороны, вы можете использовать многопроцессорную обработку.Конвейер вместо многопроцессорной обработки.Очередь для выполнения того, что вам нужно, без получения каких-либо ошибок во время выполнения функции Lambda.

Пожалуйста, обратитесь к ссылке ниже для получения более подробной информации об исходном коде для параллельного выполнения:

https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/

Также вы можете обратиться к приведенному ниже коду:

 import time
import multiprocessing

region_maps = {
        "eu-west-1": {
            "dynamodb":"dynamodb.eu-west-1.amazonaws.com"
        },
        "us-east-1": {
            "dynamodb":"dynamodb.us-east-1.amazonaws.com"
        },
        "us-east-2": {
            "dynamodb": "dynamodb.us-east-2.amazonaws.com"
        }
    }

def multiprocessing_func(region):
    time.sleep(1)
    endpoint = region_maps[region]['dynamodb']
    print('endpoint for {} is {}'.format(region, endpoint))

def lambda_handler(event, context):
    starttime = time.time()
    processes = []
    regions = ['us-east-1', 'us-east-2', 'eu-west-1']
    for region in regions:
        p = multiprocessing.Process(target=multiprocessing_func, args=(region,))
        processes.append(p)
        p.start()

    for process in processes:
        process.join()

    output = 'That took {} seconds'.format(time.time() - starttime)
    print(output)
    return output
  

Надеюсь, это поможет.

Комментарии:

1. Спасибо за ваш ответ! Итак, решение заключается в запуске нескольких процессов на одном lambda? Тогда как мне добавить все новые триггеры в режиме реального времени для многопроцессорной обработки?

2. @DenysMelnychenko Да, вы можете запускать несколько процессов на одном lambda. Наилучший возможный способ добавить триггер для обработки данных в реальном времени — это использовать сервис AWS Kinesis. Функция lambda будет вызвана при обнаружении новых записей в потоке kinesis.

Ответ №2:

Количество параллельных лямбд определяется количеством сегментов, в которые вы записываете данные в dynamodb.

Amazon DynamoDB, AWS Lambda опрашивает ваш поток и вызывает вашу лямбда-функцию. Когда ваша лямбда-функция регулируется, Lambda пытается обработать ограниченный пакет записей до истечения срока действия данных. Для Amazon Kinesis этот период времени может составлять до семи дней. Регулируемый запрос обрабатывается как блокировка для каждого сегмента, и Lambda не считывает никаких новых записей из сегмента, пока срок действия регулируемого пакета записей не истечет или он не завершится успешно. Если в потоке более одного сегмента, Lambda продолжает вызывать нерегулируемые сегменты, пока один из них не завершится.

Источник

Это сделано для контроля того, the events are processed in order что они были выполнены на dynamodb. Но количество сегментов напрямую не контролируется вами.

Теперь лучшее, что вы можете сделать, это,

  1. установите большее Batch size значение в функции lambda. Делая это, вы получите несколько событий в одном и том же lambda. Вы можете использовать параллелизм в функции lambda для обработки их всех вместе. но это будет иметь очевидные недостатки, например, что, если вы не сможете обработать их все до истечения времени ожидания lambda. вам нужно будет убедиться, что код является потокобезопасным.

Ответ №3:

Вероятно, запись в DynamoDB блокирует параллелизм в этом случае.

Альтернативная архитектура для быстрой и очень масштабируемой обработки элементов: добавляйте элементы в корзину S3 в виде файлов. Затем триггер в корзине S3 запустит Lambda. Новый файл — новая лямбда, таким образом, только лямбда-параллелизм ограничит количество лямбд, которые у вас есть параллельно.