python 异步框架和同步框架,python实现异步执行
官网参考:
芹菜官网:http://www.celeryproject.org/
芹菜官方文件英文版:http://docs.celeryproject.org/en/latest/index.html
芹菜官方文件中文版:http://docs.jinkan.org/docs/celery/
简介:
1)可以通过自己的命令启动服务,不依赖任何服务器(内部支持socket)。
2)芹菜服务为其他项目服务的任务需求提供异步解决方案。
注意:将有两个服务同时运行,一个是项目服务,另一个是芹菜服务。项目服务会把需要异步处理的任务交给芹菜服务,芹菜会在需要的时候异步完成项目需求。
人是独立的服务医院也是独立的服务。
正常情况下,人可以在没有医院参与的情况下完成所有的健康动作;但是人生病了,就会被医院接收,解决人生病的问题。
人的病的治疗方案交给医院解决。大家都没病的时候,医院独立运行。人生病了,医院会解决人生病的需求。
芹菜建筑图:
Celery的架构由三部分组成,即消息代理、任务执行单元(worker)和任务执行结果存储。
消息中间件Celery本身不提供消息服务,但是可以很容易地与第三方提供的消息中间件集成。包括RabbitMQ,Redis等等。
任务执行单元worker是Celery提供的任务执行单元,Worker在分布式系统节点中并发运行。
任务结果存储任务结果存储用于存储工作器执行的任务的结果。芹菜支持以不同的方式存储任务的结果,包括AMQP,redis等。
三、异步执行使用场景:解决耗时任务。
延迟执行:解决延迟的任务。
时机:解决周期性任务。
四。芹菜杆的安装配置安装芹菜
消息中间件:RabbitMQ/Redis
App=Celery(任务名,broker=xxx ,backend=xxx )
动词(verb的缩写)两种芹菜任务结构:提倡包管理,结构更清晰#如果芹菜对象:芹菜(.)被放置在模块下
# 1)终端切换到模块所在的文件夹:scripts。
# 2)执行命令启动worker: celery worker -A模块名-l info -P eventlet
#注意:windows系统需要eventlet支持,Linux和MacOS直接执行:芹菜工-A模块名-l info
#注意:模块名是可选的。
#如果芹菜对象:芹菜(.)放在一个包裹下面
# 1)你必须在这个包下建立一个芹菜. py的文件,把芹菜的声明(.)在这个文件中生成对象。
# 2)执行命令启动worker:celery worker-A package name-l info-P event let
#注意:windows系统需要eventlet支持,Linux和MacOS直接执行:芹菜工-A模块名-l info
#注意:包名是可选的。芹菜执行异步任务包架构封装项目。
芹菜_任务#芹菜套餐
__init__。py #包文件
celery.py # celery连接和配置相关文件,并且名字必须上交celery.py
tasks.py #所有任务功能
add_task.py #添加任务
get_result.py #获取结果芹菜. py基本配置# 1)创建应用程序任务
# 2)启动芹菜(app)服务:
#非windows
# command:celery worker-a celery _ task-linfo
#窗口:
# pip3安装事件
#芹菜工人-芹菜_任务-信息-P事件
# 3)添加任务:手动添加。要自定义用于添加任务的脚本,请右键单击以执行脚本。
# 4)获取结果:手动获取。要自定义get任务的脚本,请右键单击以执行脚本。
从芹菜进口芹菜
#没有密码
broker= redis://127 . 0 . 0 . 1:6379/1
back end= redis://127 . 0 . 0 . 1:6379/2
#密码:
broker= redis://:123 @ 127 . 0 . 0 . 1:6379/1
back end= redis://:123 @ 127 . 0 . 0 . 1:6379/2
app=Celery(broker=broker,backend=backend,include=[celery_task.tasks])
代理:任务仓库
后端:任务结果仓库
包含:任务(函数)所在的文件
tasks.py从添加任务。芹菜进口app
@app.task
定义添加(n1,n2):
res=n1 n2
打印( n1 n2=%s % res )
返回资源
@app.task
def low(n1、n2):
res=n1-n2
打印( n1-n2=%s % res )
返回resadd_task.py添加立即、延迟任务从芹菜_任务导入任务
#延迟:添加立即任务
#应用_异步:添加延迟任务
#预计到达时间:执行的美国联合技术公司时间
# 添加立即执行任务
t1=tasks.add.delay(10,20)
t2=tasks.low.delay(100,50)
打印(t1.id)
# 添加延迟任务
从芹菜_包.任务导入跳转
从日期时间导入日期时间,时间增量
# 秒
定义eta_second(秒):
ctime=datetime.now() #当前时间
utc _ ctime=日期时间。utcfromtimestamp(ctime。时间戳())#当前协调世界时。亦称协道界时时间
时间延迟=时间增量(秒=秒)#秒
返回utc_ctime时间延迟#当前时间往后延迟的秒
# 天
预计到达天数(天):
ctime=datetime.now() #当前时间
utc _ ctime=日期时间。utcfromtimestamp(ctime。时间戳())#当前协调世界时。亦称协道界时时间
时间延迟=时间增量(天数=天数)#天
返回utc_ctime时间延迟#当前时间往后延迟的天
jump.apply_async(args=(20,5),eta=eta_second(10)) # 10秒后执行
jump.apply_async(args=(20,5),eta=eta_days(1)) # 1天后执行getresult。巴拉圭获取结果来自芹菜_任务.芹菜导入应用
来自芹菜。结果导入异步结果
id= 21325 a40-9d 32-44 b5-a701-9a 31 C3 c74 b5
if __name__==__main__ :
async=AsyncResult(id=id,app=app)
if async.successful():
result=async.get()
打印(结果)
elif async.failed()。
打印(任务失败)
elif async.status==PENDING :
打印(任务等待中被执行)
elif async.status==RETRY :
打印(任务异常后正在重试)
elif async.status==STARTED :
打印(任务已经开始被执行)
九、高级使用芹菜。巴拉圭定时任务配置(循环的)特点:
添加任务的终端关闭之后,停止添加
芹菜服务端关闭后,把关闭之后未执行的任务都执行一遍,然后继续接收任务
# 1)创建应用任务
# 2)启动芹菜(app)服务:
# 注):-A表示相对路径,所以一定先进入芹菜_任务所在包
-我表示打印到日志信息级别
# 非窗子
# 命令:芹菜工-一根芹菜_task -l信息
#窗口:
# pip3安装事件
#芹菜工人-芹菜_任务-信息-P事件
# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:芹菜段-一根芹菜_任务-我信息
# 4)获取结果
从芹菜进口芹菜
# 无密码
broker= redis://127。0 .0 .1:6379/1
back end= redis://127。0 .0 .1:6379/2
# 有密码:
broker= redis://:123 @ 127。0 .0 .1:6379/1
后端= redis://:123 @ 127。0 .0 .1:6379/2
app=Celery(broker=broker,backend=backend,include=[celery_task.tasks])
# 时区
app.conf.timezone=亚洲/上海
# 是否使用协调世界时。亦称协道界时
app.conf.enable_utc=False
# 自动任务的定时配置
从日期时间导入时间增量
从芹菜.时间表导入例行性工作排程
app.conf.beat_schedule={
# 定时任务名字
fall_task :
任务:芹菜_任务。任务。秋天,
args:(30,20),
schedule :时间增量(秒=3),# 3秒后执行
# schedule: crontab(hour=8,day_of_week=1),#每周一早八点
}
}
秋季_任务:任务名自定义
任务:任务来源
参数:任务参数
时间表:定时时间
schedule: crontab(hour=8,day_of_week=1),#每周一早八点
分钟:分钟
小时:小时
星期几:礼拜
月日:月
年月日:年
tasks.pyfrom .芹菜进口应用
@app.task
定义下降(n1,n2):
res=n1/n2
打印( n1 /n2=%s % res )
从芹菜_任务.芹菜导入应用程序返回resget _ result.pyfrom
来自芹菜。结果导入异步结果
id= 21325 a40-9d 32-44 b5-a701-9a 31 C3 c74 b5
if __name__==__main__ :
async=AsyncResult(id=id,app=app)
if async.successful():
result=async.get()
打印(结果)
elif async.failed()。
打印(任务失败)
elif async.status==PENDING :
打印(任务等待中被执行)
elif async.status==RETRY :
打印(任务异常后正在重试)
elif async.status==STARTED :
打印(任务已经开始被执行)
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。