Сельдерей: контроль выполнения заданий в тесте

#python #celery

Вопрос:

Встраиваемый работник сельдерея для pytest очень удобен.

Но я хотел бы иметь больше контроля.

Я хотел бы управлять работником вручную из моего теста.

Пример:

 def test_mytask(celery_worker):

    celery_worker.stop()  # <--- I am missing this method

    mytask.delay()

    ...check state before execution of task

    celery_worker.process_one_task() # <--- I am missing this method

    ...check state after execution of task
 

Как получить что-то вроде методов, которые я изобрел для приведенного выше примера ( worker.stop() , worker.process_one_task() )?