Реактивный Монго: превышено максимальное количество операций (maxQueueWaitSize), равное 500

#connection-pooling #spring-data-mongodb #project-reactor #changestream

#объединение в пул соединений #spring-data-mongodb #проект-реактор #поток изменений

Вопрос:

Я использую ReactiveMongoTemplate MongoDB ChangeStream для прослушивания обновлений коллекции MongoDB, выполнения некоторых запросов и сохранения документа в другой коллекции. Хотя локально он работал нормально, после развертывания в UAT, который имел большой объем, он начал выдавать приведенную ниже ошибку:

 Too many operations are already waiting for a collection. Max number of operations (maxWaitQueueSize) of 500 has been exceeded.
  

Каковы способы решения этой проблемы?

У меня есть следующее в приложении.yml-файл

 spring:
   data: 
      mongodb:
          uri: mongodb://host:port/db?authMechanism=<val1>amp;authSource=<val2>amp;authechanismProperties=<val3>
  

И вот как выглядит упрощенный механизм изменения:

 @Autowired
ReactiveMongoTemplate reactiveMongoTemplate;

reactiveMongoTemplate
  .changeStream(Sample.class)
  .watchCollection("sample_collection")
  .filter(
     new Criteria.orOperator(
        where("operationType").is("update"),
        where("operationType").is("insert")
     )
  )
  .listen()
  .flatMap(r->processMessage(r)). // processMessage does some queries to collections including this collection being listened to and upserts to same mongodb in a different collection
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe();
  

Я понимаю, что мне может потребоваться добавить некоторый пул подключений, чтобы иметь возможность обрабатывать несколько подключений? Но как мне настроить это с помощью реактивного MongoDB?
Я новичок в реактивном программировании. Любые указатели были бы действительно полезны.

Ответ №1:

Здесь можно сделать несколько вещей:

  1. Проверьте, не выполняются ли какие-либо длительные блокирующие вызовы, вызывающие блокировку потоков и приводящие к созданию большого количества подключений, поскольку предыдущие все еще заняты выполнением тяжелой задачи. Попробуйте проверить наличие некоторых оптимизаций в коде, который блокирует эти вызовы. В реактивном программировании вы можете проверить наличие блокирующего кода с помощью BlockHound .

  2. Увеличьте лимит подключений, указав waitQueueMultiple или maxPoolSize https://docs.mongodb.com/manual/reference/connection-string/#connection-pool-options

Перед этим вы можете проверить статистику своей базы данных mongo, чтобы увидеть текущие и разрешенные подключения, используя

 db.serverStatus().connections