Как добавить тайм-аут в Deferred из API deferToThread от Twisted?

#python #twisted

#python #twisted

Вопрос:

из twisted.internet import reactor 
из потоков импорта twisted.internet 
из twisted.internet импорт отложить 
время импорта

def worker(аргумент):
 выведите "Привет, мир" 
 time.sleep(10)
 возвращает 1

def run():
 выведите "Запуск рабочих"
 l = [] 
 для x в диапазоне (2): 
 l.append(threads.deferToThread(worker, x))
 верните defer.Отложенный список(l)

разрешение определения (результаты):
 печать результатов 
 reactor.stop()

d = выполнить()
d.addCallback(res)
reactor.run()

Как остановить рабочих по таймауту?

Ответ №1:

Потоки не могут быть прерваны, если они не сотрудничают с вами. time.sleep(10) не собирается сотрудничать, поэтому я не думаю, что вы можете прервать этого рабочего. Если у вас есть другой тип worker, который имеет несколько дискретных фаз или работает в цикле над некоторыми задачами, тогда вы можете сделать что-то вроде этого:

 def worker(stop, jobs):
    for j in jobs:
        if stop:
            break
        j.do()

stop = []
d = deferToThread(worker)

# This will make the list eval to true and break out of the loop.
stop.append(None)
  

Это также не относится к Twisted конкретно. Именно так потоки работают в Python.

Комментарии:

1. С 2019 года этот ответ больше не должен быть принятым ответом, поскольку существует вызов twisted, специально созданный для этого варианта использования. twistedmatrix.com/documents/current/api / …

2. Это не так. Для отложенных потоков может быть добавлен тайм-аут, но потоки все равно не могут быть прерваны без сотрудничества.

3. Заголовок однозначен: «Как добавить тайм-аут в twisted defer». Первоначальный пользователь, возможно, был сбит с толку совместной природой потоков, но это как бы в стороне от сути. Прямо сейчас этот конкретный поиск выдает этот SO-ответ как принятый.

4. Исправил название для вас

Ответ №2:

Хотя прерывание потоков может оказаться невозможным, отложенное может быть остановлено с помощью cancel функции, которая, я думаю, доступна в Twisted 10.1.0 и более поздних версиях.

Я использовал следующий класс для создания отложенных вызовов, которые возвращают определенную функцию, если Deferred не сработал через некоторое время. Это может быть полезно для кого-то, у кого есть тот же вопрос, что и в теме OP.

РЕДАКТИРОВАТЬ: Как предложено в комментариях ниже, лучше не наследовать от defer.Deferred . Поэтому я изменил код, чтобы использовать оболочку, которая достигает того же эффекта.

 class DeferredWrapperWithTimeout(object):
    '''
    Holds a deferred that allows a specified function to be called-back
    if the deferred does not fire before some specified timeout.
    '''
    def __init__(self, canceller=None):
        self._def = defer.Deferred(canceller)

    def _finish(self, r, t):
        '''
        Function to be called (internally) after the Deferred
        has fired, in order to cancel the timeout.
        '''
        if ( (t!=None) and (t.active()) ):
            t.cancel()
        return r

    def getDeferred(self):
        return self._def

    def addTimeoutCallback(self, reactr, timeout,
                           callUponTimeout, *args, **kw):
        '''
        The function 'callUponTimeout' (with optional args or keywords)
        will be called after 'timeout' seconds, unless the Deferred fires.
        '''

        def timeoutCallback():
            self._def.cancel()
            callUponTimeout(*args, **kw)
        toc = reactr.callLater(timeout, timeoutCallback)
        return self._def.addCallback(self._finish, toc)
  

Пример обратного вызова перед таймаутом:

 from twisted.internet import reactor

from DeferredWithTimeout import *

dw = DeferredWrapperWithTimeout()
d  = dw.getDeferred()

def testCallback(x=None):
    print "called"

def testTimeout(x=None):
    print "timedout"

d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.callLater(2, d.callback, "cb")
reactor.run()
  

Выводит «вызвано» и ничего больше.

Пример тайм-аута перед обратным вызовом:

 from twisted.internet import reactor

from DeferredWithTimeout import *

dw = DeferredWrapperWithTimeout()
d  = dw.getDeferred()

def testCallback(x=None):
    print "called"

def testTimeout(x=None):
    print "timedout"

d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.run()
  

Выводит «тайм-аут» через 20 секунд, и больше ничего.

Комментарии:

1. Вам действительно не следует создавать подкласс Deferred . Реализуйте эту функциональность как отдельный помощник, а не как подкласс. pyvideo.org/video/1684 / …

2. В дополнение к обычным предупреждениям о том, что нельзя создавать подклассы, Deferred это особенно плохо для подкласса, потому что его поведение предполагает очень специфические особенности его собственной реализации и не будет хорошо реагировать на переопределение определенных методов.

3. Спасибо за ссылку на это видео! Это полностью изменило способ разработки кода.

4. Этот ответ устарел по состоянию на 2019 год. Используйте официальный API: twistedmatrix.com/documents/current/api / …

Ответ №3:

Мы делаем это следующим образом, используя декоратор. Преимущество этого метода в том, что deferred отменяется при достижении тайм-аута. Это должно каким-то образом стать частью библиотеки Twisted, имхо

 from twisted.internet import defer, reactor

def timeout(secs):
    """Decorator to add timeout to Deferred calls"""
    def wrap(func):
        @defer.inlineCallbacks
        def _timeout(*args, **kwargs):
            raw_d = func(*args, **kwargs)
            if not isinstance(raw_d, defer.Deferred):
                defer.returnValue(raw_d)

            timeout_d = defer.Deferred()
            times_up = reactor.callLater(secs, timeout_d.callback, None)

            try:
                raw_result, timeout_result = yield defer.DeferredList(
                    [raw_d, timeout_d], fireOnOneCallback=True, fireOnOneErrback=True,
                    consumeErrors=True)
            except defer.FirstError as e:  # Only raw_d should raise an exception
                assert e.index == 0
                times_up.cancel()
                e.subFailure.raiseException()
            else:  # timeout
                if timeout_d.called:
                    raw_d.cancel()
                    raise Exception("%s secs have expired" % secs)

            # no timeout
            times_up.cancel()
            defer.returnValue(raw_result)
        return _timeout
return wrap
  

Ответ №4:

Ну, мой ответ не о потоках, но, как было сказано, вы можете реализовать функциональность тайм-аута в качестве отдельного помощника:

 from twisted.internet import defer

def add_watchdog(deferred, timeout=0.05):

    def callback(value):
        if not watchdog.called:
            watchdog.cancel()
        return value

    deferred.addBoth(callback)

    from twisted.internet import reactor
    watchdog = reactor.callLater(timeout, defer.timeout, deferred)

d = defer.Deferred()
add_watchdog(d)
  

Затем вы можете перехватить defer.TimeoutError ошибку в deferred, если вам нужно.

Комментарии:

1. хммм, похоже, отмена отсутствует

2. @Carld’Halluin , не хотите уточнить или предложить редактирование?