Отправка сообщения из задачи Celery в каналы

#django #celery #django-channels #celerybeat

#django #сельдерей #django-каналы #celerybeat

Вопрос:

Django 2.1.1, Django Channels 2.1.3, Celery 4.2.1

Я настроил задачу в Celery, и в конце задачи мне нужно отправить сообщение websocket клиенту (ам). Однако сообщение websocket никогда не отправляется. Ошибок не возникает, оно просто не отправляется.

Я настроил уровень канала, используя Redis в качестве серверной части. Выполнение этого из обычного представления Django работает нормально. Но при запуске в задаче Celery оно отправляет сообщение каналам, и я вижу, что каналы действительно выполняют код, показанный в моем consumers.py приведенный ниже код, но клиент никогда не получает сообщение websocket.

tasks.py

 def import_job(self):
    # (do task calculations, store in data dict)
    message = {'type': 'send_my_data',
               'data': json.dumps(thecalcs) }
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)('core-data', message)
  

consumers.py

 class AsyncDataConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.channel_group_name = 'core-data'

        # Join the group
        await self.channel_layer.group_add(
            self.channel_group_name,
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        # Leave the group
        await self.channel_layer.group_discard(
            self.channel_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data=None, bytes_data=None):
        pass

    # Receive message from the group
    async def send_my_data(self, event):
        text = event['data']
        # Send message to WebSocket
        await self.send(text_data=text)
  

settings.py

 CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}
  

Поскольку нет исключения / ошибки, я совершенно не понимаю, в какой части этого процесса происходит сбой.

  1. Сельдерей запускает задачу? ДА
  2. Задача запускается и отправляет сообщение на уровень канала? ДА
  3. Потребитель получает сообщение от группы и выполняет send() ? ДА
  4. Клиент получает сообщение websocket? НЕТ

Это проблема между каналами и Redis? Это проблема между каналами и клиентом?

Комментарии:

1. Не совсем уверен, но я предполагаю, что, поскольку процесс, поддерживающий соединение с websocket, отличается от процесса, в котором вы запускаете задачу (celery worker), у него нет подключения для отправки сообщения

2. Неважно, я читал о случаях отправки сообщений канала из задач celery, поэтому я был неправ

3. Пожалуйста, покажите свой полный consumers.py или завершенный класс Consumer. И добавили ли вы канал в группу в своем ws_connect ? Могли ли вы отправлять и получать сообщения, если не через задачи? Что такое self.group_name ? Добавлен ли ваш канал в эту группу?

4. @spiritsree Я обновил это полным consumers.py файлом. Да, групповая отправка отлично работает при выполнении из обычного кода Django. Только в задачах Celery не отправляется сообщение websocket. Я использую эту настройку потребителя и веб-сокета в течение нескольких месяцев без проблем. Проблему я вижу только тогда, когда я попытался использовать его в задачах Celery.

5. Все в порядке. Вы проверили журналы celery? Есть ошибки? Как вы реализуете параллелизм? eventlet или gevent или prefork ?

Ответ №1:

Оказывается, Celery проглатывал исключение в моем коде во время выполнения задачи. Мне нужно реализовать более тщательное ведение журнала, чтобы перехватить эти исключения.

Комментарии:

1. Привет, Чарльз, у меня такая же проблема, как и у вас. Я вижу внутри командной строки Redis, что сообщение отправляется где-то на канальном уровне из фоновой задачи Celery, но мои потребители не обрабатывают событие и не передают его по сокету клиенту. На самом деле иногда сокет отключается в тот момент, когда сообщение попадает на уровень каналов. Как вы с этим разобрались со своей стороны? Приветствую Марко

2. @MarcoPolo Привет, у меня такая же проблема, как у вас. Как вы устранили ошибку?

3. Привет, gii96, мне так и не удалось разобраться с этим, извините. Использование Сельдерея с каналами стало очень сложным. В итоге я отказался от этого.

4. Привет, gii96, мне так и не удалось разобраться с этим, извините. Использование Сельдерея с каналами стало очень сложным. В итоге я отказался от этого.