#python #amazon-web-services #aws-lambda #multiprocessing #psycopg2
#python #amazon-веб-сервисы #aws-lambda #многопроцессорная обработка #psycopg2
Вопрос:
В функции AWS Lambda (среда выполнения Python 3.8) я создал два процесса. В каждом из этих процессов я пытаюсь параллельно запустить две совершенно разные функции, каждая из которых выполняет SQL-запрос. Каждый из этих SQL-запросов взаимодействует с различными таблицами базы данных PostgreSQL (драйвер psycopg2).
Как вы можете видеть из моего кода, я создаю один объект подключения в самом начале функции AWS Lambda. В каждом из запущенных процессов я использую этот объект подключения. Каждый процесс использует свой собственный курсор. В конце каждого процесса курсор закрывается.
Кроме того, я использую глобальную connection.autocommit = True
опцию.
Функция MemorySize
AWS Lambda является 10240
.
Когда я попытался выполнить свой код, я получил ошибку со следующим содержимым в обоих процессах:
[ERROR] psycopg2.OperationalError: SSL error: decryption failed or bad record mac
Согласно документации psycopg2:
при использовании модуля, такого как многопроцессорная обработка, или метода разветвленного веб-развертывания, такого как FastCGI, убедитесь, что соединения создаются после разветвления.
Создание объекта с подключением к базе данных в каждом процессе может сильно замедлить работу функции AWS Lambda. Не так ли? Как вы можете видеть, я использую глобальную переменную ( POSTGRESQL_CONNECTION
) для повторного использования ранее созданного подключения к базе данных.
Что мне делать в этой ситуации? Мне нужно параллельно запускать несколько SQL-запросов в AWS Lambda.
Фрагмент кода:
import logging
import os
from multiprocessing import Process, Pipe
from psycopg2.extras import RealDictCursor
from functools import wraps
from typing import *
import databases # AWS Lambda Layer
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
# The connection to the database will be created the first time the AWS Lambda function is called.
# Any subsequent call to the function will use the same database connection until the container stops.
POSTGRESQL_CONNECTION = None
# Initialize global variables with parameters for settings.
POSTGRESQL_USERNAME = os.environ["POSTGRESQL_USERNAME"]
POSTGRESQL_PASSWORD = os.environ["POSTGRESQL_PASSWORD"]
POSTGRESQL_HOST = os.environ["POSTGRESQL_HOST"]
POSTGRESQL_PORT = os.environ["POSTGRESQL_PORT"]
POSTGRESQL_DB_NAME = os.environ["POSTGRESQL_DB_NAME"]
def reuse_or_recreate_postgresql_connection():
global POSTGRESQL_CONNECTION
if not POSTGRESQL_CONNECTION:
try:
POSTGRESQL_CONNECTION = databases.create_postgresql_connection(
POSTGRESQL_USERNAME,
POSTGRESQL_PASSWORD,
POSTGRESQL_HOST,
POSTGRESQL_PORT,
POSTGRESQL_DB_NAME
)
except Exception as error:
logger.error(error)
raise Exception("Unable to connect to the PostgreSQL database.")
return POSTGRESQL_CONNECTION
def execute_parallel_processes(functions: List[Dict[AnyStr, Union[Callable, Dict[AnyStr, Any]]]]) -> Dict[AnyStr, Any]:
# Create an empty list to keep all parallel processes.
processes = []
# Create an empty list of pipes to keep all connections.
pipes = []
# Create a process per function.
for index, function in enumerate(functions):
# Check whether the input arguments have keys in their dictionaries.
try:
target = function["function_object"]
except KeyError as error:
logger.error(error)
raise Exception(error)
try:
kwargs = function["function_arguments"]
except KeyError as error:
logger.error(error)
raise Exception(error)
# Create a pipe for communication.
parent_pipe, child_pipe = Pipe()
pipes.append(parent_pipe)
kwargs["pipe"] = child_pipe
# Create a process.
process = Process(target=target, kwargs=kwargs)
processes.append(process)
# Start all parallel processes.
for process in processes:
process.start()
# Wait until all parallel processes are finished.
for process in processes:
process.join()
# Get the results of all processes.
results = {}
for pipe in pipes:
results = {**results, **pipe.recv()}
# Return the results of all processes.
return results
def psycopg2_cursor(function):
@wraps(function)
def wrapper(**kwargs):
try:
postgresql_connection = kwargs["postgresql_connection"]
except KeyError as error:
logger.error(error)
raise Exception(error)
cursor = postgresql_connection.cursor(cursor_factory=RealDictCursor)
result = function(cursor=cursor, **kwargs)
cursor.close()
return result
return wrapper
@psycopg2_cursor
def get_product_data(**kwargs) -> None:
# Check whether the input arguments have keys in their dictionaries.
try:
cursor = kwargs["cursor"]
except KeyError as error:
logger.error(error)
raise Exception(error)
try:
arguments = kwargs["sql_arguments"]
except KeyError as error:
logger.error(error)
raise Exception(error)
try:
pipe = kwargs["pipe"]
except KeyError as error:
logger.error(error)
raise Exception(error)
# Prepare the SQL query.
statement = "select * from products where product_id = %(product_id)s limit 1;"
# Execute the SQL query dynamically, in a convenient and safe way.
try:
cursor.execute(statement, arguments)
except Exception as error:
logger.error(error)
raise Exception(error)
# Send data to the pipe and then close it.
pipe.send({
"product_data": cursor.fetchone()
})
pipe.close()
@psycopg2_cursor
def get_user_data(**kwargs) -> None:
# Check whether the input arguments have keys in their dictionaries.
try:
cursor = kwargs["cursor"]
except KeyError as error:
logger.error(error)
raise Exception(error)
try:
arguments = kwargs["sql_arguments"]
except KeyError as error:
logger.error(error)
raise Exception(error)
try:
pipe = kwargs["pipe"]
except KeyError as error:
logger.error(error)
raise Exception(error)
# Prepare the SQL query.
statement = "select * from users where user_id = %(user_id)s limit 1;"
# Execute the SQL query dynamically, in a convenient and safe way.
try:
cursor.execute(statement, arguments)
except Exception as error:
logger.error(error)
raise Exception(error)
# Send data to the pipe and then close it.
pipe.send({
"user_data": cursor.fetchone()
})
pipe.close()
def lambda_handler(event, context):
postgresql_connection = reuse_or_recreate_postgresql_connection()
# Run several functions in parallel to get all necessary data from different databases tables.
results_of_processes = execute_parallel_processes([
{
"function_object": get_product_data,
"function_arguments": {
"postgresql_connection": postgresql_connection,
"sql_arguments": {
"product_id": event["product_id"]
}
}
},
{
"function_object": get_user_data,
"function_arguments": {
"postgresql_connection": postgresql_connection,
"sql_arguments": {
"user_id": event["user_id"]
}
}
}
])
product_data = results_of_processes["product_data"]
user_data = results_of_processes["user_data"]
return {
'statusCode': 200,
'body': json.dumps(results_of_processes)
}
Уровень AWS Lambda называется databases
:
def create_postgresql_connection(db_username, db_password, db_host, db_port, db_name):
connection = psycopg2.connect(user=db_username, password=db_password, host=db_host, port=db_port, database=db_name)
connection.autocommit = True
return connection
Комментарии:
1. Основываясь на ошибке, я бы сказал, что у вас проблема с вашим соединением, в частности проблема с SSL :
SSL error: decryption failed or bad record mac
. Убедитесь, что вашиPOSTGRESQL_HOST
иPOSTGRESQL_PORT
т.д. установлены правильно. Это не имеет никакого отношения к «производительности» вашего кода. Это должно легко поместиться в 256 МБ памяти Lambda, вероятно, 128.2. @Jens Спасибо за ваш ответ. Я проверил значения
POSTGRESQL_HOST
POSTGRESQL_PORT
постоянных переменных. Тогда они доступны и корректны. Как вы можете видеть из моего кода, у меня естьreuse_or_recreate_postgresql_connection
функция. Внутри этой функции я могу создать объект подключения, если это необходимо. Я переместил саму функцию подключения вAWS Lambda Layers
(create_postgresql_connection
function), потому что я часто использую ее в других функциях AWS Lambda. Не могли бы вы еще раз проверить мой пост, пожалуйста ?!create_postgresql_connection
Функция довольно проста. Итак, я в замешательстве.3. Проверьте это сообщение в блоге. Та же проблема с решением: medium.com/@philamersune /. … Я бы настоятельно рекомендовал вам начать писать код синхронизации и проверить, достаточно ли он быстр для вашего варианта использования. Это похоже на преждевременную оптимизацию, которая на данный момент не нужна. Ваши запросы выглядят очень просто и быстро. Так что просто выполняйте их один за другим. Это сделает ваш код намного, намного, намного проще. В итоге вы просто получаете одного пользователя и один продукт.
4. Я уже сталкивался с этим сообщением. Я не уверен, что это будет работать в контексте AWS Lambda. Поскольку ниже описаны решения для Django. Вы так не думаете? В любом случае, спасибо! Я пришел к многопроцессорной обработке, потому что мой текущий синхронный код начал не соответствовать скорости производительности.
5. В итоге они создали новое соединение для каждого потока. Возможно, начните с этого и проверьте, достаточно ли это быстро. Поэтому не делитесь подключением глобально, а создавайте его каждый раз. В конечном итоге это было решение, используемое в сообщении в блоге.