Как я могу запускать несколько SQL-запросов параллельно в AWS Lambda?

#python #amazon-web-services #aws-lambda #multiprocessing #psycopg2

#python #amazon-веб-сервисы #aws-lambda #многопроцессорная обработка #psycopg2

Вопрос:

В функции AWS Lambda (среда выполнения Python 3.8) я создал два процесса. В каждом из этих процессов я пытаюсь параллельно запустить две совершенно разные функции, каждая из которых выполняет SQL-запрос. Каждый из этих SQL-запросов взаимодействует с различными таблицами базы данных PostgreSQL (драйвер psycopg2).

Как вы можете видеть из моего кода, я создаю один объект подключения в самом начале функции AWS Lambda. В каждом из запущенных процессов я использую этот объект подключения. Каждый процесс использует свой собственный курсор. В конце каждого процесса курсор закрывается.

Кроме того, я использую глобальную connection.autocommit = True опцию.

Функция MemorySize AWS Lambda является 10240 .

Когда я попытался выполнить свой код, я получил ошибку со следующим содержимым в обоих процессах:

 [ERROR] psycopg2.OperationalError: SSL error: decryption failed or bad record mac
 

Согласно документации psycopg2:

при использовании модуля, такого как многопроцессорная обработка, или метода разветвленного веб-развертывания, такого как FastCGI, убедитесь, что соединения создаются после разветвления.

Создание объекта с подключением к базе данных в каждом процессе может сильно замедлить работу функции AWS Lambda. Не так ли? Как вы можете видеть, я использую глобальную переменную ( POSTGRESQL_CONNECTION ) для повторного использования ранее созданного подключения к базе данных.

Что мне делать в этой ситуации? Мне нужно параллельно запускать несколько SQL-запросов в AWS Lambda.

Фрагмент кода:

 import logging
import os
from multiprocessing import Process, Pipe
from psycopg2.extras import RealDictCursor
from functools import wraps
from typing import *
import databases  # AWS Lambda Layer

logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)

# The connection to the database will be created the first time the AWS Lambda function is called.
# Any subsequent call to the function will use the same database connection until the container stops.
POSTGRESQL_CONNECTION = None

# Initialize global variables with parameters for settings.
POSTGRESQL_USERNAME = os.environ["POSTGRESQL_USERNAME"]
POSTGRESQL_PASSWORD = os.environ["POSTGRESQL_PASSWORD"]
POSTGRESQL_HOST = os.environ["POSTGRESQL_HOST"]
POSTGRESQL_PORT = os.environ["POSTGRESQL_PORT"]
POSTGRESQL_DB_NAME = os.environ["POSTGRESQL_DB_NAME"]


def reuse_or_recreate_postgresql_connection():
    global POSTGRESQL_CONNECTION
    if not POSTGRESQL_CONNECTION:
        try:
            POSTGRESQL_CONNECTION = databases.create_postgresql_connection(
                POSTGRESQL_USERNAME,
                POSTGRESQL_PASSWORD,
                POSTGRESQL_HOST,
                POSTGRESQL_PORT,
                POSTGRESQL_DB_NAME
            )
        except Exception as error:
            logger.error(error)
            raise Exception("Unable to connect to the PostgreSQL database.")
    return POSTGRESQL_CONNECTION


def execute_parallel_processes(functions: List[Dict[AnyStr, Union[Callable, Dict[AnyStr, Any]]]]) -> Dict[AnyStr, Any]:
    # Create an empty list to keep all parallel processes.
    processes = []

    # Create an empty list of pipes to keep all connections.
    pipes = []

    # Create a process per function.
    for index, function in enumerate(functions):
        # Check whether the input arguments have keys in their dictionaries.
        try:
            target = function["function_object"]
        except KeyError as error:
            logger.error(error)
            raise Exception(error)
        try:
            kwargs = function["function_arguments"]
        except KeyError as error:
            logger.error(error)
            raise Exception(error)

        # Create a pipe for communication.
        parent_pipe, child_pipe = Pipe()
        pipes.append(parent_pipe)
        kwargs["pipe"] = child_pipe

        # Create a process.
        process = Process(target=target, kwargs=kwargs)
        processes.append(process)

    # Start all parallel processes.
    for process in processes:
        process.start()

    # Wait until all parallel processes are finished.
    for process in processes:
        process.join()

    # Get the results of all processes.
    results = {}
    for pipe in pipes:
        results = {**results, **pipe.recv()}

    # Return the results of all processes.
    return results


def psycopg2_cursor(function):
    @wraps(function)
    def wrapper(**kwargs):
        try:
            postgresql_connection = kwargs["postgresql_connection"]
        except KeyError as error:
            logger.error(error)
            raise Exception(error)
        cursor = postgresql_connection.cursor(cursor_factory=RealDictCursor)
        result = function(cursor=cursor, **kwargs)
        cursor.close()
        return result
    return wrapper

@psycopg2_cursor
def get_product_data(**kwargs) -> None:
    # Check whether the input arguments have keys in their dictionaries.
    try:
        cursor = kwargs["cursor"]
    except KeyError as error:
        logger.error(error)
        raise Exception(error)
    try:
        arguments = kwargs["sql_arguments"]
    except KeyError as error:
        logger.error(error)
        raise Exception(error)
    try:
        pipe = kwargs["pipe"]
    except KeyError as error:
        logger.error(error)
        raise Exception(error)

    # Prepare the SQL query.
    statement = "select * from products where product_id = %(product_id)s limit 1;"

    # Execute the SQL query dynamically, in a convenient and safe way.
    try:
        cursor.execute(statement, arguments)
    except Exception as error:
        logger.error(error)
        raise Exception(error)

    # Send data to the pipe and then close it.
    pipe.send({
        "product_data": cursor.fetchone()
    })
    pipe.close()

@psycopg2_cursor
def get_user_data(**kwargs) -> None:
    # Check whether the input arguments have keys in their dictionaries.
    try:
        cursor = kwargs["cursor"]
    except KeyError as error:
        logger.error(error)
        raise Exception(error)
    try:
        arguments = kwargs["sql_arguments"]
    except KeyError as error:
        logger.error(error)
        raise Exception(error)
    try:
        pipe = kwargs["pipe"]
    except KeyError as error:
        logger.error(error)
        raise Exception(error)

    # Prepare the SQL query.
    statement = "select * from users where user_id = %(user_id)s limit 1;"

    # Execute the SQL query dynamically, in a convenient and safe way.
    try:
        cursor.execute(statement, arguments)
    except Exception as error:
        logger.error(error)
        raise Exception(error)

    # Send data to the pipe and then close it.
    pipe.send({
        "user_data": cursor.fetchone()
    })
    pipe.close()


def lambda_handler(event, context):
    postgresql_connection = reuse_or_recreate_postgresql_connection()

    # Run several functions in parallel to get all necessary data from different databases tables.
    results_of_processes = execute_parallel_processes([
        {
            "function_object": get_product_data,
            "function_arguments": {
                "postgresql_connection": postgresql_connection,
                "sql_arguments": {
                    "product_id": event["product_id"]
                }
            }
        },
        {
            "function_object": get_user_data,
            "function_arguments": {
                "postgresql_connection": postgresql_connection,
                "sql_arguments": {
                    "user_id": event["user_id"]
                }
            }
        }
    ])

    product_data = results_of_processes["product_data"]
    user_data = results_of_processes["user_data"]

    return {
        'statusCode': 200,
        'body': json.dumps(results_of_processes)
    }
 

Уровень AWS Lambda называется databases :

 def create_postgresql_connection(db_username, db_password, db_host, db_port, db_name):
    connection = psycopg2.connect(user=db_username, password=db_password, host=db_host, port=db_port, database=db_name)
    connection.autocommit = True
    return connection
 

Комментарии:

1. Основываясь на ошибке, я бы сказал, что у вас проблема с вашим соединением, в частности проблема с SSL : SSL error: decryption failed or bad record mac . Убедитесь, что ваши POSTGRESQL_HOST и POSTGRESQL_PORT т.д. установлены правильно. Это не имеет никакого отношения к «производительности» вашего кода. Это должно легко поместиться в 256 МБ памяти Lambda, вероятно, 128.

2. @Jens Спасибо за ваш ответ. Я проверил значения POSTGRESQL_HOST POSTGRESQL_PORT постоянных переменных. Тогда они доступны и корректны. Как вы можете видеть из моего кода, у меня есть reuse_or_recreate_postgresql_connection функция. Внутри этой функции я могу создать объект подключения, если это необходимо. Я переместил саму функцию подключения в AWS Lambda Layers ( create_postgresql_connection function), потому что я часто использую ее в других функциях AWS Lambda. Не могли бы вы еще раз проверить мой пост, пожалуйста ?! create_postgresql_connection Функция довольно проста. Итак, я в замешательстве.

3. Проверьте это сообщение в блоге. Та же проблема с решением: medium.com/@philamersune /. … Я бы настоятельно рекомендовал вам начать писать код синхронизации и проверить, достаточно ли он быстр для вашего варианта использования. Это похоже на преждевременную оптимизацию, которая на данный момент не нужна. Ваши запросы выглядят очень просто и быстро. Так что просто выполняйте их один за другим. Это сделает ваш код намного, намного, намного проще. В итоге вы просто получаете одного пользователя и один продукт.

4. Я уже сталкивался с этим сообщением. Я не уверен, что это будет работать в контексте AWS Lambda. Поскольку ниже описаны решения для Django. Вы так не думаете? В любом случае, спасибо! Я пришел к многопроцессорной обработке, потому что мой текущий синхронный код начал не соответствовать скорости производительности.

5. В итоге они создали новое соединение для каждого потока. Возможно, начните с этого и проверьте, достаточно ли это быстро. Поэтому не делитесь подключением глобально, а создавайте его каждый раз. В конечном итоге это было решение, используемое в сообщении в блоге.