programing

비동기식으로 주기적으로 기능을 실행하려면 어떻게 해야 합니까?

megabox 2023. 5. 14. 10:30
반응형

비동기식으로 주기적으로 기능을 실행하려면 어떻게 해야 합니까?

에서 마이그레이션합니다.tornado로.asyncio그리고 나는 찾을 수 없습니다.asyncio와 동등한.tornadoPeriodicCallback(A)PeriodicCallback실행할 함수와 호출 간의 시간(밀리초)이라는 두 가지 인수가 사용됩니다.

  • 에 그런 동등한 것이 있습니까?asyncio?
  • 그렇지 않다면, 이를 구현하는 가장 깨끗한 방법은 무엇입니까?RecursionError잠시 후에?

3.5 이하의 Python 버전의 경우:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

Python 3.5 이상의 경우:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

비동기식 프로그램의 "배경"에서 무언가가 발생해야 한다고 느낄 때,asyncio.Task그것을 하는 좋은 방법일 수 있습니다. 게시물을 읽고 작업 방법을 확인할 수 있습니다.

다음은 일부 기능을 주기적으로 실행하는 클래스의 구현 가능성입니다.

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

테스트해 보겠습니다.

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

출력:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

에서 보는 바야흐로start우리는 어떤 기능들을 호출하고 무한 루프에서 약간의 시간을 자는 작업을 시작할 뿐입니다.stop우리는 그냥 그 일을 취소합니다.프로그램이 완료되는 순간 해당 작업을 중지해야 합니다.

콜백을 실행하는 데 많은 시간이 걸리지 않아야 하는 한 가지 더 중요한 사항입니다(그렇지 않으면 이벤트 루프가 중지됩니다).만약 당신이 장기적으로 운영되는 어떤 것에 전화할 계획이라면.func실행자에서 실행해야 할 수도 있습니다.

정기적인 통화는 기본 제공되지 않습니다.

예약된 모든 작업을 대기하고 실행하는 고유한 스케줄러 루프를 만듭니다.

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)
       
        # execute any scheduled tasks
        async for task in scheduled_tasks(time.time()):
            await task()

scheduled_tasks()반복기는 지정된 시간에 실행할 준비가 된 작업을 생성해야 합니다.스케줄을 작성하고 모든 작업을 시작하는 데 이론적으로 1초 이상 걸릴 수 있습니다. 여기서의 개념은 스케줄러가 마지막 점검 이후 시작했어야 하는 모든 작업을 산출한다는 것입니다.

마지막 실행 종료와 다음 실행 시작 사이에 n초가 아닌 n초마다 반복 호출이 발생하고 시간 내에 호출이 겹치지 않도록 하려면 다음과 같은 방법이 더 간단합니다.

async def repeat(interval, func, *args, **kwargs):
    """Run func every interval seconds.

    If func has not finished before *interval*, will run again
    immediately when the previous iteration finished.

    *args and **kwargs are passed as the arguments to func.
    """
    while True:
        await asyncio.gather(
            func(*args, **kwargs),
            asyncio.sleep(interval),
        )

또한 백그라운드에서 몇 가지 태스크를 실행하는 데 사용하는 예도 있습니다.

async def f():
    await asyncio.sleep(1)
    print('Hello')


async def g():
    await asyncio.sleep(0.5)
    print('Goodbye')


async def main():
    t1 = asyncio.ensure_future(repeat(3, f))
    t2 = asyncio.ensure_future(repeat(2, g))
    await t1
    await t2

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Python 3.7용 데코레이터가 있는 대체

import asyncio
import time


def periodic(period):
    def scheduler(fcn):

        async def wrapper(*args, **kwargs):

            while True:
                asyncio.create_task(fcn(*args, **kwargs))
                await asyncio.sleep(period)

        return wrapper

    return scheduler


@periodic(2)
async def do_something(*args, **kwargs):
    await asyncio.sleep(5)  # Do some heavy calculation
    print(time.time())


if __name__ == '__main__':
    asyncio.run(do_something('Maluzinha do papai!', secret=42))

@A를 기준으로 합니다. 제시 Jiryu Davis는 (@Torkel Bjørnson-Langen 및 @Rewrite 코멘트 포함) 이것은 표류를 방지하는 개선입니다.

import time
import asyncio

@asyncio.coroutine
def periodic(period):
    def g_tick():
        t = time.time()
        count = 0
        while True:
            count += 1
            yield max(t + count * period - time.time(), 0)
    g = g_tick()

    while True:
        print('periodic', time.time())
        yield from asyncio.sleep(next(g))

loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

이 솔루션은 페르난도 호세 에스테베스소자의 장식 개념, 보이치에흐 미그다의 표류 회피책 및 슈퍼 클래스를 사용하여 비동기 주기 함수를 처리할 수 있는 가장 우아한 코드를 생성합니다.

나사산 없이.

솔루션은 다음 파일로 구성됩니다.

  • periodic_async_thread.py를 할 수 .
  • a_periodic_thread.py하위 클래스의 예를 사용하여
  • run_me.py를 들어 및 을 합니다.

PeriodicAsyncThreadperiodic_async_thread.py:

import time
import asyncio
import abc

class PeriodicAsyncThread:
    def __init__(self, period):
        self.period = period

    def periodic(self):
        def scheduler(fcn):
            async def wrapper(*args, **kwargs):
                def g_tick():
                    t = time.time()
                    count = 0
                    while True:
                        count += 1
                        yield max(t + count * self.period - time.time(), 0)
                g = g_tick()

                while True:
                    # print('periodic', time.time())
                    asyncio.create_task(fcn(*args, **kwargs))
                    await asyncio.sleep(next(g))
            return wrapper
        return scheduler

    @abc.abstractmethod
    async def run(self, *args, **kwargs):
        return

    def start(self):
        asyncio.run(self.run())

한 하위 클래스의 는 다음과 .APeriodicThreada_periodic_thread.py:

from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio

class APeriodicThread(PeriodicAsyncThread):
    def __init__(self, period):
        super().__init__(period)
        self.run = self.periodic()(self.run)
    
    async def run(self, *args, **kwargs):
        await asyncio.sleep(2)
        print(time.time())

의 및 run_me.py:

from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()

이 코드는 다른 솔루션의 시간 이동 문제를 완화하는 우아한 솔루션을 나타냅니다.출력은 다음과 유사합니다.

1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736

실타래로.

솔루션은 다음 파일로 구성됩니다.

  • async_thread.py캐노피 비동기 스레드 클래스를 사용합니다.
  • periodic_async_thread.py를 할 수 .
  • a_periodic_thread.py하위 클래스의 예를 사용하여
  • run_me.py를 들어 및 을 합니다.

AsyncThreadasync_thread.py:

from threading import Thread
import asyncio
import abc

class AsyncThread(Thread):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

    @abc.abstractmethod
    async def async_run(self, *args, **kwargs):
        pass

    def run(self, *args, **kwargs):
        # loop = asyncio.new_event_loop()
        # asyncio.set_event_loop(loop)

        # loop.run_until_complete(self.async_run(*args, **kwargs))
        # loop.close()
        asyncio.run(self.async_run(*args, **kwargs))

PeriodicAsyncThreadperiodic_async_thread.py:

import time
import asyncio
from .async_thread import AsyncThread

class PeriodicAsyncThread(AsyncThread):
    def __init__(self, period, *args, **kwargs):
        self.period = period
        super().__init__(*args, **kwargs)
        self.async_run = self.periodic()(self.async_run)

    def periodic(self):
        def scheduler(fcn):
            async def wrapper(*args, **kwargs):
                def g_tick():
                    t = time.time()
                    count = 0
                    while True:
                        count += 1
                        yield max(t + count * self.period - time.time(), 0)
                g = g_tick()

                while True:
                    # print('periodic', time.time())
                    asyncio.create_task(fcn(*args, **kwargs))
                    await asyncio.sleep(next(g))
            return wrapper
        return scheduler

한 하위 클래스의 는 다음과 .APeriodicThreada_periodic_thread.py:

import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio

class APeriodicAsyncTHread(PeriodicAsyncThread):
    async def async_run(self, *args, **kwargs):
        print(f"{current_thread().name} {time.time()} Hi!")
        await asyncio.sleep(1)
        print(f"{current_thread().name} {time.time()} Bye!")

의 및 run_me.py:

from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()

이 코드는 다른 솔루션의 시간 이동 문제를 완화하는 우아한 솔루션을 나타냅니다.출력은 다음과 유사합니다.

a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!

여러 유형의 스케줄링에 대해 비동기식 지원이 가능한 APS Scheduler를 추천합니다.

저는 도커를 사용하여 부팅할 수 있는 간단한 파이썬 프로세스에 사용하며 도커/프로세스를 죽일 때까지 매주 무언가를 실행하는 크론처럼 실행됩니다.

이것이 비동기식을 사용하여 주기적인 콜백 이론을 테스트하기 위해 수행한 작업입니다.저는 토네이도를 사용한 경험이 없기 때문에 주기적인 콜백이 토네이도와 어떻게 작동하는지 정확히 모르겠습니다.▁the다를 사용하는 데 익숙합니다.after(ms, callback)Tkinter의 방법, 그리고 이것이 제가 생각해낸 것입니다. While True:비록 그것이 비동기적일지라도 (글로벌보다 더 그렇다) 나에게는 추하게 보일 뿐입니다.call_later(s, callback, *args)

import asyncio
my_var = 0
def update_forever(the_loop):
    global my_var
    print(my_var)
    my_var += 1 
    # exit logic could be placed here
    the_loop.call_later(3, update_forever, the_loop)  # the method adds a delayed callback on completion

event_loop = asyncio.get_event_loop()
event_loop.call_soon(update_forever, event_loop)
event_loop.run_forever()

언급URL : https://stackoverflow.com/questions/37512182/how-can-i-periodically-execute-a-function-with-asyncio

반응형