Таймер без опроса / без блокировки?

#python #maze

#python #таймер

Вопрос:

Лучшее решение, которое я нашел до сих пор, — это просто использовать sleep() функцию. Я хотел бы запустить свою собственную функцию обратного вызова, когда произойдет событие истечения срока действия таймера. Есть ли какой-либо управляемый событиями способ сделать это?

 from time import sleep

# Sleep for a minute
time.sleep(60)
 

Комментарии:

1. Существует несколько библиотек таймеров событий / очередей. Я использую Twisted reactor, но это потому, что мне нужен Twisted для других целей. Вы определенно не хотите отключать свой поток, если вашему приложению есть что еще делать.

2. Вы получили ответ на этот вопрос? Если да, вы бы приняли один из опубликованных ответов? Если нет, есть ли у вас собственный ответ для публикации?

Ответ №1:

Есть встроенное простое решение, использующее модуль threading:

 import threading

timer = threading.Timer(60.0, callback)
timer.start()  # after 60 seconds, 'callback' will be called

## (in the meanwhile you can do other stuff...)
 

Вы также можете передавать аргументы и kwargs в свой обратный вызов. Смотрите здесь .

Комментарии:

1. однако я не могу сбросить таймер. Я не хочу, чтобы это выполнялось непрерывно. я хочу запустить его в другом событии, а затем получить событие истечения срока действия. когда я пытаюсь timer.cancel() , я получаю: raise RuntimeError(«потоки могут быть запущены только один раз») RuntimeError: потоки могут быть запущены только один раз

2. Вам не нужно / не нужно сбрасывать таймер. Это работает только один раз.

Ответ №2:

Я думаю, это может быть очень просто. Взгляните на этот пример. Это работает даже в консоли python!

 from threading import Thread
from time import sleep

# Function to be called when the timer expires
def myFunction():
    print 'Did anyone call me?'

# Function with the timer
def myTimer(seconds):
    sleep(seconds)
    myFunction()

# Thread that will sleep in background and call your function
# when the timer expires.
myThread = Thread(target=myTimer, args=(4,))
myThread.start()
 

Установите любое количество секунд, которое вы хотите, и продолжайте работать с консолью или запускать основной поток / программу. Вы заметите, что функция будет вызвана, когда таймер подойдет к концу.

Редактировать

Еще один хороший пример, учитывая комментарий от @tarabyte, — это тот, в котором функция вызывается только в зависимости от значения некоторой переменной или флага. Я надеюсь, что тогда это будет ответ, который ищет @tarabyte.

 from threading import Thread
from time import sleep

myFlag = False

# Function to be called when the flag turns on
def myFunction():
    print 'Did anyone call me?'

def myTimer():
    global myFlag
    while True:
        if myFlag:
            myFunction()
            myFlag = False
        else:
            sleep(1)

# Thread that will sleep in background and call your function
# when the myFlag turns to be True
myThread = Thread(target=myTimer)
myThread.start()

# Then, you can do whatever you want and later change the value of myFlag.
# Take a look at the output inside ipython when the value of myFlag is changed.


In [35]: myFlag
Out[35]: False

In [36]: myFlag = True

In [37]: Did anyone call me?
 

Комментарии:

1. Отличный пример, но я должен был уточнить, что меня не интересуют повторяющиеся истечения срока действия, только те, которые я хотел бы .start() сам, а затем завершить после первого перехода. Я думаю, я могу взломать решение, обернув содержимое MyTimer в if (some_var_i_set_elsewhere)

2. Привет @tarabyte. Пожалуйста, взгляните на отредактированную версию моего сообщения. Я думаю, это более или менее то, что вы хотите сделать. Вы можете изменить тип и возможные значения myFlag , чтобы настроить его в соответствии с вашими потребностями. Если вы согласны со мной, пожалуйста, примите ответ как правильный и отметьте его.

3. @Javier «Это работает даже в консоли python» — весь код python работает в консоли так же, как и в скрипте. Только некоторый код работает только в консоли, а не в скрипте python. Например, somevar в консоли выводится значение somevar, если оно было определено ранее, но то же самое в скрипте python ничего не делает.

Ответ №3:

Иногда лучше простое решение, даже если оно опрашивает время. Я использовал это с большим успехом раньше — он не блокируется, если ваш поток не останавливается на нем.

Я думаю, что я бы справился с этим проще всего, проверив время, поскольку это намного проще и экономичнее, чем разработка отдельного поточного решения:

 def event_minute_later(event):
    print(time.time()) # use for testing, comment out or delete for production
    return event   60 < time.time()
 

И использование:

 >>> event = time.time()
>>> print(event)
1393962502.62

>>> event_minute_later(event)
1393962526.73
False
>>> event_minute_later(event)
1393962562.9
True
 

Ответ №4:

Поскольку Python 3.7 (и более старые версии к настоящему времени достигли конца жизни) asyncio , встроенный модуль позволяет добавлять sleep() вызов Python асинхронно:

 import asyncio

async def test():
    print("Hello ... but wait, there is more!")
    await asyncio.sleep(3)
    print("... in the async world!")

 

Вот некоторые доказательства того, что он неблокирующий (любезно предоставлено RealPython):

 import asyncio
# Jupyter Notebook users need to allow
# nesting of the asyncio event loop
import nest_asyncio
nest_asyncio.apply()
import time

async def workload(text, duration):
    while duration > 0:
        # run sleep and yield control 
        # back to the event loop (for one cycle)
        await asyncio.sleep(1)
        print(f'{text} counter: sleeping {duration} seconds')
        duration -= 1

async def main():
    # send the workload() coroutine to the background,
    # to let it run concurrently with other tasks, 
    # switching between them at await points
    task_1 = asyncio.create_task(workload('First', 2))
    task_2 = asyncio.create_task(workload('Second', 4))
    task_3 = asyncio.create_task(workload('Third', 8))
    print(f"Started: {time.strftime('%X')}")
    # create await points for each 
    # of the concurrent tasks
    await task_1
    await task_2
    await task_3
    print(f"Ended: {time.strftime('%X')}")

if __name__ == '__main__':
    asyncio.run(main())
 

Вывод:

 Started: 09:07:21
First counter: sleeping 2 seconds
Second counter: sleeping 4 seconds
Third counter: sleeping 8 seconds
First counter: sleeping 1 seconds
Second counter: sleeping 3 seconds
Third counter: sleeping 7 seconds
Second counter: sleeping 2 seconds
Third counter: sleeping 6 seconds
Second counter: sleeping 1 seconds
Third counter: sleeping 5 seconds
Third counter: sleeping 4 seconds
Third counter: sleeping 3 seconds
Third counter: sleeping 2 seconds
Third counter: sleeping 1 seconds
Ended: 09:07:29