#asynchronous #sqlalchemy
#асинхронный #sqlalchemy
Вопрос:
Я использую AsyncSession от SQLAlchemy 1.4.0b1 для обновления базы данных Postgres с помощью asyncpg 0.21.0. Приведенный ниже код предназначен для обновления объектов и добавления новых объектов в ответ на различные входящие сообщения потока Redis
Сопрограмма save_revised (обновление) работает нормально, как и сопрограмма session.add.части td_move. Однако часть обновления td_move в нижней части функции (начиная с if this_train_id and msg.get('from') in finals[crossing]
) работает только с перерывами: я получаю некоторые обновления БД, но только ~ 1/3 или около того сообщений журнала, указывающих, что требуется обновление.
Кто-нибудь может подсказать, в чем может быть проблема (проблемы)?
async def main():
logger.info(f"db_updater starting {datetime.now().strftime('%H:%M:%S')}")
engine = create_async_engine(os.getenv('ASYNC_DB_URL'), future=True)
async with AsyncSession(engine) as session:
crossings, headcodes, lean_params, finals, active_trains, train_ids, berthtimes, hc_types = await get_db_data(logger) # noqa: E501
pool = await aioredis.create_redis_pool(('redis', 6379), db=0, password=os.getenv('REDIS_PW'), encoding='utf-8')
last_id = '
Ответ №1:
Для дальнейшего использования проблема заключалась в выполнении (синхронизации) вызова журнала. Я удалил это (и добавлю вызов асинхронного регистратора), измененный код теперь работает нормально
while True:
all_msgs = await pool.xread(['del_hc_s', 'xing_revised', 'all_td', 'add_hc_s'], latest_ids=[last_id, last_id, last_id, last_id]) # noqa: E501
for stream_name, msg_id, msg in all_msgs:
message = dict(msg)
crossing = message.get('crossing')
if stream_name == 'all_td':
await td_move(message, train_ids, active_trains, finals, lean_params, session)
elif stream_name == 'xing_revised':
await save_revised(message, lean_params[crossing], session)
async def save_revised(msg, params, session):
train_id = msg.get('train_id')
# today_class is a SQLA model class from declarative_base()
today_class = params['today_class']
rev_time = datetime.fromtimestamp(
int(msg.get('revised')))
stmt = update(today_class).where(today_class.train_id == train_id).
values(xing_revised=rev_time).
execution_options(synchronize_session="fetch")
await session.execute(stmt)
if msg.get('revised_ten') != 'X':
stmt2 = update(today_class).where(today_class.train_id == train_id).
values(xing_revised_ten=rev_time).
execution_options(synchronize_session="fetch")
await session.execute(stmt2)
await session.commit()
async def td_move(msg, train_ids, active_trains, finals, params, session):
crossing = msg.get('crossing')
descr = msg.get('descr')
if crossing:
this_train_id = [s for s in train_ids[crossing] if descr in s]
if this_train_id:
this_train_id = this_train_id[0]
else:
return
if this_train_id and active_trains[crossing].get(this_train_id) and (
is_within_minutes(30, active_trains[crossing].get(this_train_id))):
# Td_Ca_Cc is a SQLA model class from declarative_base()
td = Td_Ca_Cc(
msg_type=msg.get('msg_type'),
descr=msg.get('descr'),
traintype=active_trains[crossing].get(
this_train_id).get('train_type'),
from_berth=msg.get('from'),
to_berth=msg.get('to'),
tdtime=dt_from_timestamp(msg.get('time')),
seconds=0,
area_id=msg.get('area_id'),
updated=datetime.now(),
crossing=crossing
)
session.add(td)
if this_train_id and msg.get('from') in finals[crossing]:
today_class = params[crossing]['today_class']
stmt = update(today_class).where(today_class.train_id == this_train_id).
values(xing_actual=datetime.now(), cancel_time='XXX').
execution_options(synchronize_session="fetch")
await session.execute(stmt)
logger.info(f"{crossing} {msg.get('descr')} passed {datetime.now().strftime('%H:%M:%S')}")
await session.commit()
if __name__ == '__main__':
asyncio.run(main())
Ответ №1:
Для дальнейшего использования проблема заключалась в выполнении (синхронизации) вызова журнала. Я удалил это (и добавлю вызов асинхронного регистратора), измененный код теперь работает нормально