#django #python-2.7 #celery #celerybeat
#django #python-2.7 #сельдерей #celerybeat
Вопрос:
Я использую celerybeat для отправки периодических задач в очередь rabbitmq. Некоторое время она работает должным образом, а затем отправка celery.backend_cleanup завершается с ошибкой SSL.
[2016-10-24 04:00:02,309: DEBUG/MainProcess] Channel open
[2016-10-24 04:00:02,443: ERROR/MainProcess] Message Error: Couldn't apply scheduled task celery.backend_cleanup: [Errno 1] _ssl.c:1309: error:1409F07F:SSL routines:SSL3_WRITE_PENDING:bad write retry
[' File "/local/mnt/apps/ipcat/venvs/django16/bin/celery", line 9, in <module>n load_entry_point('celery-ipcat==3.1.23', 'console_scripts', 'celery')()n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/__main__.py", line 30, in mainn main()n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/celery.py", line 81, in mainn cmd.execute_from_commandline(argv)n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/celery.py", line 793, in execute_from_commandlinen super(CeleryCommand, self).execute_from_commandline(argv)))n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/base.py", line 311, in execute_from_commandlinen return self.handle_argv(self.prog_name, argv[1:])n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/celery.py", line 785, in handle_argvn return self.execute(command, argv)n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/celery.py", line 717, in executen ).run_from_argv(self.prog_name, argv[1:], command=argv[0])n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/base.py", line 315, in run_from_argvn sys.argv if argv is None else argv, command)n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/base.py", line 377, in handle_argvn return self(*args, **options)n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/base.py", line 274, in __call__n ret = self.run(*args, **kwargs)n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/beat.py", line 79, in runn return beat().run()n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/apps/beat.py", line 83, in runn self.start_scheduler()n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/apps/beat.py", line 112, in start_schedulern beat.start()n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 479, in startn interval = self.scheduler.tick()n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 221, in tickn next_time_to_run = self.maybe_due(entry, self.publisher)n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 207, in maybe_duen exc, traceback.format_stack(), exc_info=True)n']
Traceback (most recent call last):
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 204, in maybe_due
result = self.apply_async(entry, publisher=publisher)
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 260, in apply_async
entry, exc=exc)), sys.exc_info()[2])
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 252, in apply_async
**entry.options)
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/app/task.py", line 565, in apply_async
**dict(self._get_exec_options(), **options)
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/app/base.py", line 354, in send_task
reply_to=reply_to or self.oid, **options
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/app/amqp.py", line 310, in publish_task
**kwargs
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/messaging.py", line 172, in publish
routing_key, mandatory, immediate, exchange, declare)
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/connection.py", line 436, in _ensured
return fun(*args, **kwargs)
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/messaging.py", line 184, in _publish
[maybe_declare(entity) for entity in declare]
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/messaging.py", line 111, in maybe_declare
return maybe_declare(entity, self.channel, retry, **retry_policy)
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/common.py", line 113, in maybe_declare
return _maybe_declare(entity, declared, ident, channel)
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/common.py", line 120, in _maybe_declare
entity.declare()
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/entity.py", line 521, in declare
self.exchange.declare(nowait)
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/entity.py", line 174, in declare
nowait=nowait, passive=passive,
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/amqp/channel.py", line 615, in exchange_declare
self._send_method((40, 10), args)
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
write_frame(1, channel, payload)
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
frame_type, channel, size, payload, 0xce,
File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/amqp/transport.py", line 254, in _write
n = write(s)
File "/usr/lib64/python2.7/ssl.py", line 172, in write
return self._sslobj.write(data)
SchedulingError: Couldn't apply scheduled task celery.backend_cleanup: [Errno 1] _ssl.c:1309: error:1409F07F:SSL routines:SSL3_WRITE_PENDING:bad write retry
[2016-10-24 04:00:02,743: DEBUG/MainProcess] beat: Waking up in 5.00 minutes.
После этого сбоя все последующие задачи отправки завершаются с ошибкой как,
[2016-10-24 12:30:03,026: DEBUG/MainProcess] beat: Synchronizing schedule...
[2016-10-24 12:30:03,253: ERROR/MainProcess] Message Error: Couldn't apply scheduled task aggregate_hw_errata_sign_off_metrics_schedule: 'NoneType' object has no attribute 'do_handshake'
Я мог бы отключить celery.backend_cleanjob, используя CELERY_TASK_RESULT_EXPIRES = None, но, боюсь, это может привести к сбою в других настроенных задачах.
Приветствуется любая помощь или руководство.
Ответ №1:
Я думаю, я нашел проблему. Я создавал соединение и передавал его экземпляру exchange и queue. Я изменил код, чтобы разрешить Queue и Exchange создавать соединение с брокером. пока никаких проблем после этого изменения.
Спасибо
Старый код:
from kombu import Connection, Queue, Exchange
conn = Connection(
hostname=settings.BROKER_HOST,
port=settings.BROKER_PORT,
userid=settings.BROKER_USER,
password=settings.BROKER_PASSWORD,
virtual_host=settings.BROKER_VHOST,
connect_timeout=BROKER_CONNECTION_TIMEOUT,
ssl=BROKER_USE_SSL,
transport='pyamqp')
conn.connect()
channel = conn.channel()
exchange = Exchange('my.exchange', type='direct', passive=True, channel=channel)
CELERY_QUEUES = (
Queue('my.tasks', exchange=exchange, routing_key='my.metrics', channel=conn, passive=True),
)
Новый код (исправление):
from kombu import Queue, Exchange
exchange = Exchange('my.exchange', type='direct', passive=True)
CELERY_QUEUES = (
Queue('my.tasks', exchange=exchange, routing_key='my.metrics', passive=True),
)