Автоматическое подключение PyMongo.пул в _raise_connection_failure

#python #mongodb #pymongo #django-celery

#python #mongodb #pymongo #django-сельдерей

Вопрос:

Я использую драйвер python mongo для подключения к MongoDB. хотя он работает должным образом и для других задач, через некоторое время я получил исключение ниже только для этой задачи:

Дополнительная информация:

  • mongodb имеет три разделенных кластера
  • Исключение, вызванное методом bulk_write
 AutoReconnectpymongo.pool in _raise_connection_failure error
mongo-router-3:27017: [Errno 104] Connection reset by peer
  

Вот мой код:

 import mongoengine
from celery.schedules import crontab
from celery.task import Task, PeriodicTask
from pymongo import UpdateOne


def chunker(array, n=10000):
    for i in range(0, len(array), n):
        yield array[i:i   n]


class StatCalculator(Task):
    name = "StatCalculator"
    routing_key = 'my-routing-key'
    soft_time_limit = 3600 * 2
    time_limit = 3600 * 2
    max_retries = 1

    def __init__(self):
        self.col = None

        super(StatCalculator, self).__init__()

    def run(self, ids):
        self._initialize()
        self._process(ids)
        self._persist()
        self._finalize()

    def _initialize(self):
        self.col = mongoengine.connection._get_db().my_collection
        self.calculated_times = {}

    def _process(self, ids):
        """
        This method fetch the items and append them to `calculated_times`

        @param ids:
        @return:
        """
        pass

    def _persist(self):
        bulk_ops = []

        for key, stat in self.calculated_times.items():
            bulk_ops.append(UpdateOne(
                {'some_id': key['some_id'],
                 'created_at': self.now},
                {'$set': stat},
                upsert=True
            ))

        if bulk_ops:
            # raise AutoReconnectpymongo.pool in _raise_connection_failure
            # mongo-router:27017: [Errno 104] Connection reset by peer
            self.col.bulk_write(bulk_ops, ordered=False)

    def _finalize(self):
        pass


class PeriodicStatCalculator(PeriodicTask):
    name = "PeriodicStatCalculator"
    run_every = crontab(hour="3", minute="0")
    routing_key = 'my-routing-key'
    soft_time_limit = 3600 * 1
    time_limit = 3600 * 1
    max_retries = 1

    def run(self, *args, **kwargs):
        ids = []  # a list of some ids

        for chunk in chunker(ids):
            StatCalculator().delay(chunk)
  

Периодические задачи выполняются каждый день и извлекают некоторые идентификаторы из таблицы Postgres,
затем периодической задачей будет вызвана другая задача сельдерея для извлечения, обработки и сохранения некоторой информации соответственно переданным идентификаторам.

наконец, обработанная информация будет сохранена в коллекции Mongo методом bulk_write

Среды:

  • django-rest-framework-mongoengine==3.4.1

  • mongoengine==0.20.0

  • pymongo==3.7.0

  • сельдерей==4.3.0rc1

  • django-сельдерей==3.3.1

Вопросы:

  • Почему у меня это исключение?
  • Как я могу преодолеть это исключение?

Комментарии:

1. Вам нужно будет проверить журналы сервера, чтобы найти подсказки, почему сервер закрыл соединение. Попробуйте поиграть с размером пакета. Драйвер должен автоматически разделять ваши массовые обновления на 1000 операций на пакет, и мне интересно, пытается ли он отправить слишком много пакетов одновременно. Это может привести к исчерпанию ресурсов сервера и отключению соединений.

2. @AlexBlex спасибо за подсказки, я сделаю это, но я сомневаюсь в этом, потому что у нас есть множество инструментов мониторинга, и если бы ресурсы достигли максимального предела, мы бы знали. вы правы насчет журналов mongo. Я проверю их.

3. Тогда должно быть что-то, что происходит под радаром. Я хочу сказать, что _raise_connection_failure улавливает все исключения из сокета . Затем он пытается сузить его до более конкретных ошибок тайм-аута и в противном случае вызывает автоматическое подключение. Так что в основном это любая ошибка tcp. В предоставленном коде на стороне приложения нет ничего неправильного, и размер пакета — это единственное, что приходит на ум, что может повлиять на производительность tcp из этого конкретного фрагмента.

Ответ №1:

Может быть любое количество причин:

  • У вас ненадежная сеть между приложением и базой данных. Переместите приложение ближе к базе данных, базу данных ближе к приложению или найдите лучшего поставщика услуг.
  • Вы используете старый драйвер с более высокими настройками сохранения TCP. Обновите до самого последнего драйвера, который использует более низкие настройки.
  • На сервере заканчиваются ресурсы, и он прерывает операцию и / или закрывает соединение. Просмотрите журналы сервера для подсказок.