비동기식으로 주기적으로 기능을 실행하려면 어떻게 해야 합니까?
에서 마이그레이션합니다.tornado
로.asyncio
그리고 나는 찾을 수 없습니다.asyncio
와 동등한.tornado
의PeriodicCallback
(A)PeriodicCallback
실행할 함수와 호출 간의 시간(밀리초)이라는 두 가지 인수가 사용됩니다.
- 에 그런 동등한 것이 있습니까?
asyncio
? - 그렇지 않다면, 이를 구현하는 가장 깨끗한 방법은 무엇입니까?
RecursionError
잠시 후에?
3.5 이하의 Python 버전의 경우:
import asyncio
@asyncio.coroutine
def periodic():
while True:
print('periodic')
yield from asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
Python 3.5 이상의 경우:
import asyncio
async def periodic():
while True:
print('periodic')
await asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
비동기식 프로그램의 "배경"에서 무언가가 발생해야 한다고 느낄 때,asyncio.Task
그것을 하는 좋은 방법일 수 있습니다.이 게시물을 읽고 작업 방법을 확인할 수 있습니다.
다음은 일부 기능을 주기적으로 실행하는 클래스의 구현 가능성입니다.
import asyncio
from contextlib import suppress
class Periodic:
def __init__(self, func, time):
self.func = func
self.time = time
self.is_started = False
self._task = None
async def start(self):
if not self.is_started:
self.is_started = True
# Start task to call func periodically:
self._task = asyncio.ensure_future(self._run())
async def stop(self):
if self.is_started:
self.is_started = False
# Stop task and await it stopped:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
async def _run(self):
while True:
await asyncio.sleep(self.time)
self.func()
테스트해 보겠습니다.
async def main():
p = Periodic(lambda: print('test'), 1)
try:
print('Start')
await p.start()
await asyncio.sleep(3.1)
print('Stop')
await p.stop()
await asyncio.sleep(3.1)
print('Start')
await p.start()
await asyncio.sleep(3.1)
finally:
await p.stop() # we should stop task finally
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
출력:
Start
test
test
test
Stop
Start
test
test
test
[Finished in 9.5s]
에서 보는 바야흐로start
우리는 어떤 기능들을 호출하고 무한 루프에서 약간의 시간을 자는 작업을 시작할 뿐입니다.온stop
우리는 그냥 그 일을 취소합니다.프로그램이 완료되는 순간 해당 작업을 중지해야 합니다.
콜백을 실행하는 데 많은 시간이 걸리지 않아야 하는 한 가지 더 중요한 사항입니다(그렇지 않으면 이벤트 루프가 중지됩니다).만약 당신이 장기적으로 운영되는 어떤 것에 전화할 계획이라면.func
실행자에서 실행해야 할 수도 있습니다.
정기적인 통화는 기본 제공되지 않습니다.
예약된 모든 작업을 대기하고 실행하는 고유한 스케줄러 루프를 만듭니다.
import math, time
async def scheduler():
while True:
# sleep until the next whole second
now = time.time()
await asyncio.sleep(math.ceil(now) - now)
# execute any scheduled tasks
async for task in scheduled_tasks(time.time()):
await task()
그scheduled_tasks()
반복기는 지정된 시간에 실행할 준비가 된 작업을 생성해야 합니다.스케줄을 작성하고 모든 작업을 시작하는 데 이론적으로 1초 이상 걸릴 수 있습니다. 여기서의 개념은 스케줄러가 마지막 점검 이후 시작했어야 하는 모든 작업을 산출한다는 것입니다.
마지막 실행 종료와 다음 실행 시작 사이에 n초가 아닌 n초마다 반복 호출이 발생하고 시간 내에 호출이 겹치지 않도록 하려면 다음과 같은 방법이 더 간단합니다.
async def repeat(interval, func, *args, **kwargs):
"""Run func every interval seconds.
If func has not finished before *interval*, will run again
immediately when the previous iteration finished.
*args and **kwargs are passed as the arguments to func.
"""
while True:
await asyncio.gather(
func(*args, **kwargs),
asyncio.sleep(interval),
)
또한 백그라운드에서 몇 가지 태스크를 실행하는 데 사용하는 예도 있습니다.
async def f():
await asyncio.sleep(1)
print('Hello')
async def g():
await asyncio.sleep(0.5)
print('Goodbye')
async def main():
t1 = asyncio.ensure_future(repeat(3, f))
t2 = asyncio.ensure_future(repeat(2, g))
await t1
await t2
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Python 3.7용 데코레이터가 있는 대체
import asyncio
import time
def periodic(period):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
while True:
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(period)
return wrapper
return scheduler
@periodic(2)
async def do_something(*args, **kwargs):
await asyncio.sleep(5) # Do some heavy calculation
print(time.time())
if __name__ == '__main__':
asyncio.run(do_something('Maluzinha do papai!', secret=42))
@A를 기준으로 합니다. 제시 Jiryu Davis는 (@Torkel Bjørnson-Langen 및 @Rewrite 코멘트 포함) 이것은 표류를 방지하는 개선입니다.
import time
import asyncio
@asyncio.coroutine
def periodic(period):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * period - time.time(), 0)
g = g_tick()
while True:
print('periodic', time.time())
yield from asyncio.sleep(next(g))
loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
이 솔루션은 페르난도 호세 에스테베스 데 소자의 장식 개념, 보이치에흐 미그다의 표류 회피책 및 슈퍼 클래스를 사용하여 비동기 주기 함수를 처리할 수 있는 가장 우아한 코드를 생성합니다.
나사산 없이.실
솔루션은 다음 파일로 구성됩니다.
periodic_async_thread.py
를 할 수 .a_periodic_thread.py
하위 클래스의 예를 사용하여run_me.py
를 들어 및 을 합니다.
그PeriodicAsyncThread
의 periodic_async_thread.py
:
import time
import asyncio
import abc
class PeriodicAsyncThread:
def __init__(self, period):
self.period = period
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
@abc.abstractmethod
async def run(self, *args, **kwargs):
return
def start(self):
asyncio.run(self.run())
한 하위 클래스의 는 다음과 .APeriodicThread
a_periodic_thread.py
:
from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio
class APeriodicThread(PeriodicAsyncThread):
def __init__(self, period):
super().__init__(period)
self.run = self.periodic()(self.run)
async def run(self, *args, **kwargs):
await asyncio.sleep(2)
print(time.time())
의 및 run_me.py
:
from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()
이 코드는 다른 솔루션의 시간 이동 문제를 완화하는 우아한 솔루션을 나타냅니다.출력은 다음과 유사합니다.
1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736
실타래로.실
솔루션은 다음 파일로 구성됩니다.
async_thread.py
캐노피 비동기 스레드 클래스를 사용합니다.periodic_async_thread.py
를 할 수 .a_periodic_thread.py
하위 클래스의 예를 사용하여run_me.py
를 들어 및 을 합니다.
그AsyncThread
의 async_thread.py
:
from threading import Thread
import asyncio
import abc
class AsyncThread(Thread):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@abc.abstractmethod
async def async_run(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(self.async_run(*args, **kwargs))
# loop.close()
asyncio.run(self.async_run(*args, **kwargs))
그PeriodicAsyncThread
의 periodic_async_thread.py
:
import time
import asyncio
from .async_thread import AsyncThread
class PeriodicAsyncThread(AsyncThread):
def __init__(self, period, *args, **kwargs):
self.period = period
super().__init__(*args, **kwargs)
self.async_run = self.periodic()(self.async_run)
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
한 하위 클래스의 는 다음과 .APeriodicThread
a_periodic_thread.py
:
import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio
class APeriodicAsyncTHread(PeriodicAsyncThread):
async def async_run(self, *args, **kwargs):
print(f"{current_thread().name} {time.time()} Hi!")
await asyncio.sleep(1)
print(f"{current_thread().name} {time.time()} Bye!")
의 및 run_me.py
:
from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()
이 코드는 다른 솔루션의 시간 이동 문제를 완화하는 우아한 솔루션을 나타냅니다.출력은 다음과 유사합니다.
a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!
여러 유형의 스케줄링에 대해 비동기식 지원이 가능한 APS Scheduler를 추천합니다.
저는 도커를 사용하여 부팅할 수 있는 간단한 파이썬 프로세스에 사용하며 도커/프로세스를 죽일 때까지 매주 무언가를 실행하는 크론처럼 실행됩니다.
이것이 비동기식을 사용하여 주기적인 콜백 이론을 테스트하기 위해 수행한 작업입니다.저는 토네이도를 사용한 경험이 없기 때문에 주기적인 콜백이 토네이도와 어떻게 작동하는지 정확히 모르겠습니다.▁the다를 사용하는 데 익숙합니다.after(ms, callback)
Tkinter의 방법, 그리고 이것이 제가 생각해낸 것입니다. While True:
비록 그것이 비동기적일지라도 (글로벌보다 더 그렇다) 나에게는 추하게 보일 뿐입니다. 그call_later(s, callback, *args)
import asyncio
my_var = 0
def update_forever(the_loop):
global my_var
print(my_var)
my_var += 1
# exit logic could be placed here
the_loop.call_later(3, update_forever, the_loop) # the method adds a delayed callback on completion
event_loop = asyncio.get_event_loop()
event_loop.call_soon(update_forever, event_loop)
event_loop.run_forever()
언급URL : https://stackoverflow.com/questions/37512182/how-can-i-periodically-execute-a-function-with-asyncio
'programing' 카테고리의 다른 글
로그인 실패로 인해 SQL Server 2012를 시작할 수 없음 (0) | 2023.05.14 |
---|---|
Mongoose - 한 함수 호출에서 여러 문서 제거 (0) | 2023.05.14 |
Alt-Tab 프로그램 전환기에서 창을 숨기는 가장 좋은 방법은 무엇입니까? (0) | 2023.05.14 |
웹 앱 서비스에서 "Azure 파일 스토리지"를 사용하려면 어떻게 해야 합니까? (0) | 2023.05.14 |
SQL WHERE ID(id1, id2, ..., idn) (0) | 2023.05.14 |