Интеграция Flask с Celery 3.x и SQLAlchemy

#python #flask #celery

#python #flask #сельдерей

Вопрос:

По этой теме есть ряд вопросов без ответов.

Проблема заключается в использовании задач celery, которые могут обращаться к базе данных через SQLAlchemy. Без надлежащей интеграции с Flask мы бы получили типичное out of context сообщение об ошибке.

Следовательно, это моя попытка интегрировать Flask и Celery, которая, похоже, работает нормально. В качестве следующего шага я хотел бы вызвать эту задачу при нажатии ‘/’. Однако я получаю сообщение об отказе в подключении (см. Трассировку ниже)

wsgi_fb.py

 def make_celery(the_app):
    the_celery = Celery(the_app.import_name, backend=the_app.config['CELERY_RESULT_BACKEND'], broker=the_app.config['BROKER_URL_CELERY'])
    the_celery.config_from_object(config_celery)
    the_celery.conf.update(the_app.config)
    task_base = the_celery.Task
    class ContextTask(task_base):
        abstract = True
        def __call__(self, *args, **kwargs):
            with the_app.app_context():
                return task_base.__call__(self, *args, **kwargs)
    the_celery.Task = ContextTask
    return the_celery

app = Flask(__name__)
app.config.from_object(config)
celery_app = make_celery(app)
celery_app.config_from_object(config_celery)
celery_app.set_current()
api = Api(app)
db.init_app(app)
api.add_resource(Index, '/')
  

facebook_bot.py

 import tasks

class Index(Resource):
    def get(self):
        tasks.send_tag_batches.delay()
        return 'OK'
  

tasks.py

 from celery import current_app

@current_app.task
def send_tag_batches():
    ...
    return 'ok'
  

Трассировка стека:

  File "/Users/houmie/projects/chasebot/src/facebook/resource/facebook_bot.py", line 22, in get
    tasks.send_tag_batches.delay()
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/celery/app/task.py", line 453, in delay
    return self.apply_async(args, kwargs)
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/celery/app/task.py", line 565, in apply_async
    **dict(self._get_exec_options(), **options)
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/celery/app/base.py", line 354, in send_task
    reply_to=reply_to or self.oid, **options
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/celery/app/amqp.py", line 310, in publish_task
    **kwargs
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/kombu/messaging.py", line 172, in publish
    routing_key, mandatory, immediate, exchange, declare)
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/kombu/connection.py", line 457, in _ensured
    interval_max)
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/kombu/connection.py", line 369, in ensure_connection
    interval_start, interval_step, interval_max, callback)
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/kombu/utils/__init__.py", line 246, in retry_over_time
    return fun(*args, **kwargs)
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/kombu/connection.py", line 237, in connect
    return self.connection
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/kombu/connection.py", line 742, in connection
    self._connection = self._establish_connection()
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/kombu/connection.py", line 697, in _establish_connection
    conn = self.transport.establish_connection()
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/kombu/transport/pyamqp.py", line 116, in establish_connection
    conn = self.Connection(**opts)
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/amqp/connection.py", line 165, in __init__
    self.transport = self.Transport(host, connect_timeout, ssl)
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/amqp/connection.py", line 186, in Transport
    return create_transport(host, connect_timeout, ssl)
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/amqp/transport.py", line 299, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/Users/houmie/.pyenv/versions/3.5.1/envs/venv35/lib/python3.5/site-packages/amqp/transport.py", line 95, in __init__
    raise socket.error(last_err)
OSError: [Errno 61] Connection refused