python apscheduler每个整点运行一次,python apscheduler
在普通工作中,几乎有一半的功能模块需要通过定时任务来驱动。比如项目中有定时统计的程序,有定时从网站爬出来的URL程序,有定时检测钓鱼网站的程序等等,都涉及到定时任务的问题,所以我们找到了python的定时任务模块。
apscheduler 的使用
我们无法避免在项目中使用一些定时任务,比如最新的项目。用户点击报名考试后,需要在考试日期临近时,在微信上推送小程序消息提醒客户,并在fastapi中翻阅实现。虽然方法和包很多,但是要么太重(比如需要重启服务,依赖redis,不好用)。虽然fastapi在time模块的time.sleep()机器上的后台任务也可以变相实现,但是相对简单的功能还是可以的,复杂的代码比较麻烦,不如找个包负责这个量。环顾四周,发现一个APScheduler是相当合适的。代码简单,实现效果很好。在这里做个记录吧!
安装
pip安装时间表
主要组成部分
概念性的、熟悉的代码比这些定义更容易理解。
触发器包含调度逻辑,每个作业都有自己的触发器,用于决定下一个运行哪个作业。除了它们自己的初始配置意外,触发器是完全无状态的。交谈是您指定触发当前任务的方式。
类型
解释
日期触发器
到期执行(在x,x,x,xxxx的x小时x分x秒执行)对应于DateTrigger。
IntervalTrigger
执行间隔(每5秒)
克朗触发器
一个crontab类型的条件(这个比较复杂,例如,从周一到周四的4-5点,每5秒执行一次)
作业存储存储计划的作业。默认作业存储只是将作业存储在内存中,而其他作业存储将作业存储在数据库中。作业的数据在保存到持久性作业存储时被序列化,在加载时被反序列化。调度程序不能共享同一个作业存储。
Jobstore在调度器中初始化,也可以通过调度器的add_jobstore动态添加Jobstore。每个jobstore
会绑定一个别名,当调度器添加Add Job时,会根据指定的jobstore在调度器中找到对应的jobstore,并且
将作业添加到jobstore。
Jobstore主要通过pickle库的loads和dumps实现【实现的核心是通过python的__getstate__和__setstate__进行重写。
实现],作业在每次更改时都会动态保存到存储中,然后在使用时动态加载。存储可以是redis或
它是一个数据库【通过sqlarchemy整合多个数据库】,或者mongodb等。
APScheduler当前支持的Jobstore:
MemoryJobStore
MongoDBJobStore
再贴现商店
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore
执行器处理作业的运行。他们通常将作业中指定的可调用对象提交给线程或输入城市。当作业完成时,执行器将通知调度程序。
-用人类语言说话在添加任务的时候是用它包裹的,会根据不同的日程选择执行人的种类。如果选择AsyncIO作为调度库,则AsyncIOExecutor、tornado作为调度库、TornadoExecutor,如果选择starting process作为调度库,则可以选择ThreadPoolExecutor或ProcessPoolExecutor。执行人的选择需要根据实际调度者选择不同的执行人。
目前APSch
eduler支持的Executor:
AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor
调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业.
Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。
除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。
scheduler可根据自身的需求选择不同的组件,如果是使用AsyncIO则选择AsyncIOScheduler,使用tornado则
选择TornadoScheduler。
目前APScheduler支持的Scheduler:
AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler
简单应用
import time from apscheduler.schedulers.blocking import BlockingScheduler # 引入后台 def my_job(): print time.strftime(%Y-%m-%d %H:%M:%S, time.localtime(time.time())) sched = BlockingScheduler() sched.add_job(my_job, interval, seconds=5) sched.start()
完整代码
# trigeers 触发器 # job stores job 存储 # executors 执行器 # schedulers 调度器 from pytz import utc from sqlalchemy import func from apscheduler.schedulers.background import BackgroundScheduler,AsyncIOScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { # 可以配置多个存储 #mongo: {type: mongodb}, default: SQLAlchemyJobStore(url=sqlite:///jobs.sqlite) # SQLAlchemyJobStore指定存储链接 } executors = { default: {type: threadpool, max_workers: 20}, # 最大工作线程数20 processpool: ProcessPoolExecutor(max_workers=5) # 最大工作进程数为5 } job_defaults = { coalesce: False, # 关闭新job的合并,当job延误或者异常原因未执行时 max_instances: 3 # 并发运行新job默认最大实例多少 } scheduler = BackgroundScheduler() # .. do something else here, maybe add jobs etc. scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作为调度程序的时区 import os import time def print_time(name): print(f{name} - {time.ctime()}) def add_job(job_id, func, args, seconds): """添加job""" print(f"添加间隔执行任务job - {job_id}") scheduler.add_job(id=job_id, func=func, args=args, trigger=interval, seconds=seconds) def add_coun_job(job_id, func, args, start_time): """添加job""" print(f"添加一次执行任务job - {job_id}") scheduler.add_job(id=job_id, func=func, args=args, trigger=date,timezone=Asia/Shanghai, run_date=start_time) # scheduler.add_job(func=print_time, trigger=date,timezone=Asia/Shanghai, run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=[text2]) def remove_job(job_id): """移除job""" scheduler.remove_job(job_id) print(f"移除job - {job_id}") def pause_job(job_id): """停止job""" scheduler.pause_job(job_id) print(f"停止job - {job_id}") def resume_job(job_id): """恢复job""" scheduler.resume_job(job_id) print(f"恢复job - {job_id}") def get_jobs(): """获取所有job信息,包括已停止的""" res = scheduler.get_jobs() print(f"所有job - {res}") def print_jobs(): print(f"详细job信息") scheduler.print_jobs() def start(): """启动调度器""" scheduler.start() def shutdown(): """关闭调度器""" scheduler.shutdown() if __name__ == __main__: scheduler = BackgroundScheduler() # start() # print(Press Ctrl+{0} to exit n.format(Break if os.name == nt else C)) # add_job(job_A, func=print_time, args=("A", ), seconds=1) # add_job(job_B, func=print_time, args=("B", ), seconds=2) # time.sleep(6) # pause_job(job_A) # 停止a # get_jobs() #得到所有job # time.sleep(6) # print_jobs() # resume_job(job_A) # time.sleep(6) # remove_job(job_A) # time.sleep(6) from datetime import datetime import pytz start() date_temp = datetime(2022, 2, 19, 17, 30, 5) # scheduler.add_job(print_time, date, run_date=datetime.now(pytz.timezone(America/Manaus)), args=[text]) # scheduler.add_job(print_time, date,timezone=Asia/Shanghai, run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=[text2]) add_coun_job(job_id="job_C",func=print_time,args=(一次性执行任务,),start_time=datetime(2022, 2, 19, 18, 4, 0).astimezone()) time.sleep(130) try: shutdown() except RuntimeError: pass
到此这篇关于详解Python使用apscheduler定时执行任务的文章就介绍到这了,更多相关Python apscheduler内容请搜索盛行IT软件开发工作室以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT软件开发工作室!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。