#multithreading #scala #playframework-2.0 #threadpool
#многопоточность #scala #playframework-2.0 #threadpool
Вопрос:
У меня есть один поток в пуле потоков, обслуживающий блокирующий запрос.
def sync = Action {
import Contexts.blockingPool
Future {
Thread.sleep(100)
}
Ok("Done")
}
В Contexts.blockingPool настроен как:
custom-pool {
fork-join-executor {
parallelism-min = 1
parallelism-max = 1
}
}
Теоретически, если вышеуказанный запрос получает 100 одновременных запросов, ожидаемое поведение должно быть следующим: 1 запрос должен перейти в режим ожидания (100), а остальные 99 запросов должны быть отклонены (или поставлены в очередь до истечения времени ожидания?). Однако я заметил, что для обслуживания остальных запросов создаются дополнительные рабочие потоки. Я также заметил, что задержка увеличивается по мере того, как (становится медленнее обслуживать запрос) количество потоков в пуле становится меньше, чем полученных запросов.
Каково ожидаемое поведение, если получен запрос, превышающий размер настроенного пула потоков?
Ответ №1:
Ваш тест неправильно структурирован для проверки вашей гипотезы. Если вы просмотрите этот раздел в документах, вы увидите, что Play имеет несколько пулов потоков / контекстов выполнения. Что важно в отношении вашего вопроса, так это пул потоков по умолчанию и то, как это связано с HTTP-запросами, обслуживаемыми вашим действием.
Как описано в документе, пул потоков по умолчанию — это место, где по умолчанию выполняется весь код приложения. Т.е. весь код действия, включая все Future
(без явного определения их собственного контекста выполнения), будет выполняться в этом контексте выполнения / пуле потоков. Итак, используя ваш пример:
def sync = Action {
// *** import Contexts.blockingPool
// *** Future {
// *** Thread.sleep(100)
// ***}
Ok("Done")
}
Весь код в вашем действии, не прокомментированный // ***
, будет выполняться в пуле потоков по умолчанию.
Т.е. когда запрос перенаправляется на ваше действие:
Future
сThread.sleep
будут отправлены в ваш пользовательский контекст выполнения- затем, не дожидаясь этого
Future
завершения (поскольку он выполняется в собственном пуле потоков [Context.blockingPool
] и, следовательно, не блокирует какие-либо потоки в пуле потоков по умолчанию) - ваше
Ok("Done")
утверждение оценивается, и клиент получает ответ - Примерно через 100 миллисекунд после получения ответа ваш
Future
завершается
Итак, чтобы объяснить ваше наблюдение, когда вы отправляете 100 одновременных запросов, Play с радостью примет эти запросы, перенаправит на ваш контроллер действие (выполняемое в пуле потоков по умолчанию), отправит вашему Future
, а затем ответит клиенту.
Размер пула по умолчанию равен
play {
akka {
...
actor {
default-dispatcher = {
fork-join-executor {
parallelism-factor = 1.0
parallelism-max = 24
}
}
}
}
}
использовать 1 поток на ядро максимум до 24.
Учитывая, что ваше действие делает очень мало (кроме. Future
), вы сможете обрабатывать до 1000 запросов в секунду без особых усилий. Однако вам Future
потребуется гораздо больше времени для обработки невыполненной работы, потому что вы блокируете единственный поток в вашем пользовательском пуле ( blockingPool
).
Если вы используете мою слегка скорректированную версию вашего действия, вы увидите, что подтверждает приведенное выше объяснение в выходных данных журнала:
object Threading {
def sync = Action {
val defaultThreadPool = Thread.currentThread().getName;
import Contexts.blockingPool
Future {
val blockingPool = Thread.currentThread().getName;
Logger.debug(s"""t>>> Done on thread: $blockingPool""")
Thread.sleep(100)
}
Logger.debug(s"""Done on thread: $defaultThreadPool""")
Results.Ok
}
}
object Contexts {
implicit val blockingPool: ExecutionContext = Akka.system.dispatchers.lookup("blocking-pool-context")
}
Сначала быстро обрабатываются все ваши запросы, а затем один за другим завершаются ваши Future
.
Итак, в заключение, если вы действительно хотите проверить, как Play будет обрабатывать множество одновременных запросов только с одним потоком, обрабатывающим запросы, тогда вы можете использовать следующую конфигурацию:
play {
akka {
akka.loggers = ["akka.event.Logging$DefaultLogger", "akka.event.slf4j.Slf4jLogger"]
loglevel = WARNING
actor {
default-dispatcher = {
fork-join-executor {
parallelism-min = 1
parallelism-max = 1
}
}
}
}
}
вы также можете захотеть добавить Thread.sleep
к своему действию, подобному этому (чтобы немного замедлить работу одиночных потоков пулов потоков по умолчанию)
...
Thread.sleep(100)
Logger.debug(s"""<<< Done on thread: $defaultThreadPool""")
Results.Ok
}
Теперь у вас будет 1 поток для запросов и 1 поток для ваших Future
.
Если вы запустите это с большим количеством одновременных подключений, вы заметите, что клиент блокируется, в то время как Play обрабатывает запросы один за другим. Это то, что вы ожидали увидеть…
Комментарии:
1. Очень подробный и хорошо документированный ответ! Спасибо!!
Ответ №2:
Воспроизведение использует, AkkaForkJoinPool
который расширяется scala.concurrent.forkjoin.ForkJoinPool
. У него есть внутренняя очередь задач. Вы также можете найти это описание интересным в отношении обработки блокирующего кода с помощью fork-join-pool: Scala: глобальный ExecutionContext упрощает вашу жизнь