programing

플라스크에서 매 시간 실행되도록 함수를 예약하는 방법은 무엇입니까?

megabox 2023. 7. 18. 21:38
반응형

플라스크에서 매 시간 실행되도록 함수를 예약하는 방법은 무엇입니까?

액세스 권한이 없는 Flask 웹 호스팅이 있습니다.cron지휘권

어떻게 하면 매시간 파이썬 기능을 실행할 수 있습니까?

APScheduler 패키지(v3.5.3)에서 사용할 수 있습니다.

import time
import atexit

from apscheduler.schedulers.background import BackgroundScheduler


def print_date_time():
    print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))


scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=60)
scheduler.start()

# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())

플라스크가 디버그 모드에 있을 때 이 스케줄러 중 두 개가 실행됩니다.자세한 내용은 이 질문을 참조하십시오.

애플리케이션 스케줄러라는 개념은 조금 생소하지만, APScheduler v3.3.1에 대해 알게 된 내용은 조금 다릅니다.최신 버전의 경우 패키지 구조, 클래스 이름 등이 변경되었다고 생각되므로, 기본 플라스크 애플리케이션과 통합하여 최근에 만든 새로운 솔루션을 여기에 추가합니다.

#!/usr/bin/python3
""" Demonstrating Flask, using APScheduler. """

from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask

def sensor():
    """ Function for test purposes. """
    print("Scheduler is alive!")

sched = BackgroundScheduler(daemon=True)
sched.add_job(sensor,'interval',minutes=60)
sched.start()

app = Flask(__name__)

@app.route("/home")
def home():
    """ Function for test purposes. """
    return "Welcome Home :) !"

if __name__ == "__main__":
    app.run()

이 예에 대한 업데이트에 관심이 있는 사람이 있다면 이 Gist도 여기에 남깁니다.

다음은 향후 판독을 위한 몇 가지 참조 자료입니다.

  • APS 스케줄러 문서: https://apscheduler.readthedocs.io/en/latest/
  • 데몬=참: https://docs.python.org/3.4/library/threading.html#thread-objects

Flask 응용 프로그램을 사용하여 인터페이스를 통해 작업을 실행할 수 있습니다.

import atexit

# v2.x version - see https://stackoverflow.com/a/38501429/135978
# for the 3.x version
from apscheduler.scheduler import Scheduler
from flask import Flask

app = Flask(__name__)

cron = Scheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()

@cron.interval_schedule(hours=1)
def job_function():
    # Do your work here


# Shutdown your cron thread if the web process is stopped
atexit.register(lambda: cron.shutdown(wait=False))

if __name__ == '__main__':
    app.run()

저는 간단한 앱 스케줄러 대신 플라스크를 사용해 보았습니다. 당신이 설치해야 하는 것은

pip3 설치 플라스크_apscheduler

다음은 내 코드의 샘플입니다.

from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)
scheduler = APScheduler()

def scheduleTask():
    print("This test runs every 3 seconds")

if __name__ == '__main__':
    scheduler.add_job(id = 'Scheduled Task', func=scheduleTask, trigger="interval", seconds=3)
    scheduler.start()
    app.run(host="0.0.0.0")

간단한 솔루션의 경우 다음과 같은 경로를 추가할 수 있습니다.

@app.route("/cron/do_the_thing", methods=['POST'])
def do_the_thing():
    logging.info("Did the thing")
    return "OK", 200

그런 다음 이 엔드포인트에 정기적으로 POST하는 unixcron 작업을 추가하십시오.예를 들어 1분에 한 번 실행하려면 터미널 유형으로crontab -e다음 행을 추가합니다.

* * * * * /opt/local/bin/curl -X POST https://YOUR_APP/cron/do_the_thing

작업을 실행할 때 사용자의 경로가 없기 때문에 컬 경로는 완전해야 합니다.다음을 통해 시스템에서 컬링할 전체 경로를 확인할 수 있습니다.which curl)

수동으로 작업을 테스트하는 것이 쉽고, 추가적인 의존성이 없으며, 특별한 일이 없기 때문에 이해하기 쉽다는 점이 좋습니다.

보안.

암호로 cron 작업을 보호하려면 다음을 수행합니다.pip install Flask-BasicAuth그런 다음 앱 구성에 자격 증명을 추가합니다.

app = Flask(__name__)
app.config['BASIC_AUTH_REALM'] = 'realm'
app.config['BASIC_AUTH_USERNAME'] = 'falken'
app.config['BASIC_AUTH_PASSWORD'] = 'joshua'

작업 끝점을 암호로 보호하려면 다음과 같이 하십시오.

from flask_basicauth import BasicAuth
basic_auth = BasicAuth(app)

@app.route("/cron/do_the_thing", methods=['POST'])
@basic_auth.required
def do_the_thing():
    logging.info("Did the thing a bit more securely")
    return "OK", 200

그런 다음 cron 작업에서 호출합니다.

* * * * * /opt/local/bin/curl -X POST https://falken:joshua@YOUR_APP/cron/do_the_thing

APScheduler의 Background Scheduler를 사용하여 간격 작업을 Flask 앱에 통합할 수 있습니다.다음은 Blueprint 및 앱 팩토리(init.py )를 사용하는 예입니다.

from datetime import datetime

# import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask

from webapp.models.main import db 
from webapp.controllers.main import main_blueprint    

# define the job
def hello_job():
    print('Hello Job! The time is: %s' % datetime.now())

def create_app(object_name):
    app = Flask(__name__)
    app.config.from_object(object_name)
    db.init_app(app)
    app.register_blueprint(main_blueprint)
    # init BackgroundScheduler job
    scheduler = BackgroundScheduler()
    # in your case you could change seconds to hours
    scheduler.add_job(hello_job, trigger='interval', seconds=3)
    scheduler.start()

    try:
        # To keep the main thread alive
        return app
    except:
        # shutdown if app occurs except 
        scheduler.shutdown()

도움이 되길 바랍니다 :)

참조:

  1. https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/background.py

다른 대안은 플라스크와 잘 어울리는 플라스크-APS 스케줄러를 사용하는 것일 수 있습니다. 예:

  • Flask 구성에서 스케줄러 구성 로드
  • 플라스크 구성에서 작업 정의 로드

자세한 내용은 여기:

https://pypi.python.org/pypi/Flask-APScheduler

플라스크 크론탭 모듈을 사용할 수 있는데, 이는 매우 쉽습니다.

1단계: 파이프 설치 플라스크-크론탭

2단계:

from flask import Flask
from flask_crontab import Crontab

app = Flask(__name__)
crontab = Crontab(app)

3단계:

@crontab.job(minute="0", hour="6", day="*", month="*", day_of_week="*")
def my_scheduled_job():
    do_something()

4단계: cmd에서 히트

flask crontab add

완료. 이제 플라스크 애플리케이션을 실행하기만 하면 매일 6시에 전화를 걸 기능을 확인할 수 있습니다.

여기(공식 DOC)에서 참조할 수 있습니다.

및, 및 한 경우, 예및멀초로세프싱 10로설어로 합니다.every(2).hour.do() 스케줄은 꽤짤 때 이상 을 본.일정이 꽤 인상적이어서 표류하지 않고 일정을 짤 때 100ms 이상 쉬는 것을 본 적이 없습니다.종료 방법이 있기 때문에 스레드 대신 멀티프로세싱을 사용합니다.

#!/usr/bin/env python3

import schedule
import time
import datetime
import uuid

from flask import Flask, request
from multiprocessing import Process

app = Flask(__name__)
t = None
job_timer = None

def run_job(id):
    """ sample job with parameter """
    global job_timer
    print("timer job id={}".format(id))
    print("timer: {:.4f}sec".format(time.time() - job_timer))
    job_timer = time.time()

def run_schedule():
    """ infinite loop for schedule """
    global job_timer
    job_timer = time.time()
    while 1:
        schedule.run_pending()
        time.sleep(1)

@app.route('/timer/<string:status>')
def mytimer(status, nsec=10):
    global t, job_timer
    if status=='on' and not t:
        schedule.every(nsec).seconds.do(run_job, str(uuid.uuid4()))
        t = Process(target=run_schedule)
        t.start()
        return "timer on with interval:{}sec\n".format(nsec)
    elif status=='off' and t:
        if t:
            t.terminate()
            t = None
            schedule.clear()
        return "timer off\n"
    return "timer status not changed\n"

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

다음 명령을 실행하여 이를 테스트합니다.

$ curl http://127.0.0.1:5000/timer/on
timer on with interval:10sec
$ curl http://127.0.0.1:5000/timer/on
timer status not changed
$ curl http://127.0.0.1:5000/timer/off
timer off
$ curl http://127.0.0.1:5000/timer/off
timer status not changed

타이머가 켜질 때마다 콘솔에 타이머 메시지가 표시됩니다.

127.0.0.1 - - [18/Sep/2018 21:20:14] "GET /timer/on HTTP/1.1" 200 -
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0117sec
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0102sec

RQ 스케줄러와 같은 스케줄러와 함께 대기열 메커니즘을 사용하거나 셀러리와 같은 더 무거운 것(대부분 오버킬)을 사용할 수 있습니다.

언급URL : https://stackoverflow.com/questions/21214270/how-to-schedule-a-function-to-run-every-hour-on-flask

반응형