#flask #redis #celery
#flask #redis #сельдерей
Вопрос:
Как я могу получить результаты задачи из redis? Мне нужны результаты запроса, которые вернула задача, а не статус задач.
Файлы журнала подтверждают, что задача возвращает результаты. В соответствии с этими документами get()
может использоваться для возврата результатов задачи, и в соответствии с этими документами это должно работать. Что-то подсказывает мне, что я на самом деле не сохраняю результаты в серверной части redis.
Ожидаемое поведение: запускайте задачу каждые 24 часа и сохраняйте результаты запроса к БД в redis. Используйте кэш redis для получения этих результатов при вызовах приложений.
Вот моя функция задачи.
@shared_task(name='get_top_ten_gainers', ignore_result=False)
def get_top_ten_gainers():
from collections import namedtuple
query = (
db_session.execute(
"""WITH p AS (
SELECT CompanyId,
100 * (MAX(CASE WHEN rn = 1 THEN CloseAdjusted END) / MAX(CASE WHEN rn = 2 THEN CloseAdjusted END) - 1) DayGain
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY CompanyId ORDER BY Date DESC) rn
FROM DimCompanyPrice
)
WHERE rn <= 2
GROUP BY CompanyId
ORDER BY DayGain DESC
LIMIT 10
)
SELECT *
FROM p
JOIN Company ON p.CompanyID = Company.ID"""
)
)
logger.info(query)
Gain = namedtuple('Gain', query.keys())
gains = [Gain(*q) for q in query.fetchall()]
payload = [[g.Symbol, g.Security, g.DayGain] for g in gains]
logger.info("`payload` of type {}: {}".format(type(payload),payload))
return payload
Вот моя вспомогательная функция просмотра flask:
def _top_ten_gainers():
t0 = time.perf_counter()
from project.tasks import get_top_ten_gainers
result = get_top_ten_gainers.apply_async()
logger.info("`result` of type {}: {}".format(type(result),result))
logger.info("`result.get()` of type {}: {}".format(type(result.get()),result.get()))
total_payload['gainers'] = result.get()
logger.info('task finished in {}'.format(time.perf_counter() - t0))
Вот вывод сельдерея:
-------------- celery@desktop v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Linux-5.11.0-41-generic-x86_64-with-glibc2.29 2021-12-02 18:07:16
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: default:0x7fb6f8bb5b80 (.default.Loader)
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results: redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[2021-12-02 18:07:17,276: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2021-12-02 18:07:17,287: INFO/MainProcess] mingle: searching for neighbors
[2021-12-02 18:07:18,305: INFO/MainProcess] mingle: all alone
[2021-12-02 18:07:18,320: INFO/MainProcess] celery@desktop ready.
[2021-12-09 15:08:01,235: INFO/MainProcess] Task get_top_ten_gainers[26ce0da8-662c-4690-b95e-9d1e42e79e34] received
[2021-12-09 15:08:05,068: INFO/ForkPoolWorker-15] <sqlalchemy.engine.cursor.CursorResult object at 0x7f40c876e970>
[2021-12-09 15:08:05,087: INFO/ForkPoolWorker-15] `payload` of type <class 'list'>: [['MA', 'Mastercard', 3.856085884230387], ['GM', 'General Motors', 3.194377894904976], ['LH', 'LabCorp', 2.6360513469535274], ['CCI', 'Crown Castle', 2.491537650518838], ['DHI', 'D. R. Horton', 2.451821208757954], ['PEAK', 'Healthpeak Properties', 2.3698069046225845], ['BDX', 'Becton Dickinson', 2.355495473352187], ['DD', 'DuPont', 2.19100399536023], ['HLT', 'Hilton Worldwide', 1.9683928319458088], ['JKHY', 'Jack Henry amp; Associates', 1.7102615694164935]]
[2021-12-09 15:08:05,090: INFO/ForkPoolWorker-15] Task get_top_ten_gainers[26ce0da8-662c-4690-b95e-9d1e42e79e34] succeeded in 3.8531467270004214s: None
Вот мой вывод файла журнала:
[2021-12-09 15:08:01,216][index ][INFO ] `result` of type <class 'celery.result.AsyncResult'>: 26ce0da8-662c-4690-b95e-9d1e42e79e34
[2021-12-09 15:08:05,068][tasks ][INFO ] <sqlalchemy.engine.cursor.CursorResult object at 0x7f40c876e970>
[2021-12-09 15:08:05,087][tasks ][INFO ] `payload` of type <class 'list'>: [['MA', 'Mastercard', 3.856085884230387], ['GM', 'General Motors', 3.194377894904976], ['LH', 'LabCorp', 2.6360513469535274], ['CCI', 'Crown Castle', 2.491537650518838], ['DHI', 'D. R. Horton', 2.451821208757954], ['PEAK', 'Healthpeak Properties', 2.3698069046225845], ['BDX', 'Becton Dickinson', 2.355495473352187], ['DD', 'DuPont', 2.19100399536023], ['HLT', 'Hilton Worldwide', 1.9683928319458088], ['JKHY', 'Jack Henry amp; Associates', 1.7102615694164935]]
[2021-12-09 15:08:05,090][index ][INFO ] `result.get()` of type <class 'NoneType'>: None
Комментарии:
1. Как вы убедились, что то, что возвращает функция, на самом деле неверно? Вы пытались запустить функцию вне среды исключения сельдерея и убедиться, что она не возвращает None?
2. @DejanLekic Я обновил свой вопрос, добавив больше журналов. Результаты запроса регистрируются в файле журнала приложения, и они также регистрируются в журналах сельдерея. Я считаю, что задача возвращает None, когда она успешна, что имеет смысл в этой ситуации, но тогда возникает вопрос: как мне получить результаты запроса, которые задача предположительно хранит в redis
Ответ №1:
Похоже, ваша задача занимает ~ 4 секунды, и вы не ждете ее завершения. Вы можете подождать этого так:
def _top_ten_gainers():
from project.tasks import get_top_ten_gainers
result = get_top_ten_gainers.delay()
while not result.ready():
sleep(1)
gains = result.get()
logger.info("Task results: {}".format(gains))
Комментарии:
1. В соответствии с документами «get» ожидает, пока задача не будет готова, и возвращает результат, поэтому разве мне не нужно вручную проверять, пока не будет результата?
2. Интересно, вы пытались добавлять журналы между строками, чтобы посмотреть, действительно ли он ожидает? вы знаете, что используется по умолчанию
timeout
?3. @sat1017 ты дал ему шанс?
4. Не решает проблему. У меня есть подозрение, что результаты запроса не сохраняются в redis или что я неправильно их получаю.