Платформа воспроизведения: Что происходит, когда количество запросов превышает доступные потоки

#multithreading #scala #playframework-2.0 #threadpool

#многопоточность #scala #playframework-2.0 #threadpool

Вопрос:

У меня есть один поток в пуле потоков, обслуживающий блокирующий запрос.

   def sync = Action {
    import Contexts.blockingPool
    Future { 
        Thread.sleep(100)
    } 
    Ok("Done")
  }
  

В Contexts.blockingPool настроен как:

 custom-pool {
    fork-join-executor {
            parallelism-min = 1
            parallelism-max = 1
    }
}
  

Теоретически, если вышеуказанный запрос получает 100 одновременных запросов, ожидаемое поведение должно быть следующим: 1 запрос должен перейти в режим ожидания (100), а остальные 99 запросов должны быть отклонены (или поставлены в очередь до истечения времени ожидания?). Однако я заметил, что для обслуживания остальных запросов создаются дополнительные рабочие потоки. Я также заметил, что задержка увеличивается по мере того, как (становится медленнее обслуживать запрос) количество потоков в пуле становится меньше, чем полученных запросов.

Каково ожидаемое поведение, если получен запрос, превышающий размер настроенного пула потоков?

Ответ №1:

Ваш тест неправильно структурирован для проверки вашей гипотезы. Если вы просмотрите этот раздел в документах, вы увидите, что Play имеет несколько пулов потоков / контекстов выполнения. Что важно в отношении вашего вопроса, так это пул потоков по умолчанию и то, как это связано с HTTP-запросами, обслуживаемыми вашим действием.

Как описано в документе, пул потоков по умолчанию — это место, где по умолчанию выполняется весь код приложения. Т.е. весь код действия, включая все Future (без явного определения их собственного контекста выполнения), будет выполняться в этом контексте выполнения / пуле потоков. Итак, используя ваш пример:

 def sync = Action {

  // *** import Contexts.blockingPool
  // *** Future { 
  // *** Thread.sleep(100)
  // ***} 

  Ok("Done")
}
  

Весь код в вашем действии, не прокомментированный // *** , будет выполняться в пуле потоков по умолчанию.
Т.е. когда запрос перенаправляется на ваше действие:

  1. Future с Thread.sleep будут отправлены в ваш пользовательский контекст выполнения
  2. затем, не дожидаясь этого Future завершения (поскольку он выполняется в собственном пуле потоков [ Context.blockingPool ] и, следовательно, не блокирует какие-либо потоки в пуле потоков по умолчанию)
  3. ваше Ok("Done") утверждение оценивается, и клиент получает ответ
  4. Примерно через 100 миллисекунд после получения ответа ваш Future завершается

Итак, чтобы объяснить ваше наблюдение, когда вы отправляете 100 одновременных запросов, Play с радостью примет эти запросы, перенаправит на ваш контроллер действие (выполняемое в пуле потоков по умолчанию), отправит вашему Future , а затем ответит клиенту.

Размер пула по умолчанию равен

 play {
  akka {
    ...
    actor {
      default-dispatcher = {
        fork-join-executor {
          parallelism-factor = 1.0
          parallelism-max = 24
        }
      }
    }
  }
}
  

использовать 1 поток на ядро максимум до 24.
Учитывая, что ваше действие делает очень мало (кроме. Future ), вы сможете обрабатывать до 1000 запросов в секунду без особых усилий. Однако вам Future потребуется гораздо больше времени для обработки невыполненной работы, потому что вы блокируете единственный поток в вашем пользовательском пуле ( blockingPool ).

Если вы используете мою слегка скорректированную версию вашего действия, вы увидите, что подтверждает приведенное выше объяснение в выходных данных журнала:

 object Threading {

  def sync = Action {
    val defaultThreadPool = Thread.currentThread().getName;

    import Contexts.blockingPool
    Future {
      val blockingPool = Thread.currentThread().getName;
      Logger.debug(s"""t>>> Done on thread: $blockingPool""")
      Thread.sleep(100)
    }

    Logger.debug(s"""Done on thread: $defaultThreadPool""")
    Results.Ok
  }
}

object Contexts {
  implicit val blockingPool: ExecutionContext = Akka.system.dispatchers.lookup("blocking-pool-context")
}
  

Сначала быстро обрабатываются все ваши запросы, а затем один за другим завершаются ваши Future .

Итак, в заключение, если вы действительно хотите проверить, как Play будет обрабатывать множество одновременных запросов только с одним потоком, обрабатывающим запросы, тогда вы можете использовать следующую конфигурацию:

 play {
  akka {
    akka.loggers = ["akka.event.Logging$DefaultLogger", "akka.event.slf4j.Slf4jLogger"]
    loglevel = WARNING
    actor {
      default-dispatcher = {
        fork-join-executor {
          parallelism-min = 1
          parallelism-max = 1
        }
      }
    }
  }
}
  

вы также можете захотеть добавить Thread.sleep к своему действию, подобному этому (чтобы немного замедлить работу одиночных потоков пулов потоков по умолчанию)

     ...
    Thread.sleep(100)
    Logger.debug(s"""<<< Done on thread: $defaultThreadPool""")
    Results.Ok
}
  

Теперь у вас будет 1 поток для запросов и 1 поток для ваших Future .
Если вы запустите это с большим количеством одновременных подключений, вы заметите, что клиент блокируется, в то время как Play обрабатывает запросы один за другим. Это то, что вы ожидали увидеть…

Комментарии:

1. Очень подробный и хорошо документированный ответ! Спасибо!!

Ответ №2:

Воспроизведение использует, AkkaForkJoinPool который расширяется scala.concurrent.forkjoin.ForkJoinPool . У него есть внутренняя очередь задач. Вы также можете найти это описание интересным в отношении обработки блокирующего кода с помощью fork-join-pool: Scala: глобальный ExecutionContext упрощает вашу жизнь