celery队列执行顺序,celery分布式部署
本节内容
芹菜的介绍和基本用途
如何在项目中使用芹菜
启用多个员工
芹菜定时任务
与姜戈结合
通过django配置芹菜定期任务
一、芹菜的介绍和基本用途
Celery是基于python开发的分布式异步消息任务队列,通过它你可以轻松实现任务的异步处理。如果您的业务场景需要异步任务,可以考虑使用Celery。以下是示例场景中可用的一些示例:
如果你想在100台机器上执行一个批处理命令,可能需要很长时间,但是你不希望你的程序等待结果返回,而是会给你一个任务id。过一段时间后,你只需要持有这个任务ID就可以得到任务执行结果,在任务执行的同时你还可以继续做其他的事情。你想做一个预定的任务,比如每天查看你所有客户的信息。如果你发现今天是客户的生日,给他发个信息祝你好。
芹菜在执行任务时需要通过一个消息中间件来收发任务消息,存储任务结果。一般用rabbitMQ或者Redis,后面会讲到。
1.1芹菜有以下优点:
简单性:一旦熟悉了芹菜的工作流程,配置和使用都相对简单,可用性高。当任务执行失败或者执行过程中连接中断时,芹菜会自动尝试重新执行任务。快速:单进程芹菜每分钟可以处理数百万个任务。灵活:几乎可以扩展celery的所有组件,并且可以定制Celery的基本工作流程图。
1.2使用Celery安装Celery的默认代理是RabbitMQ,只需要配置一行。
broker _ URL= amqp://guest:guest @ localhost:5672/如果没有安装RabbitMQ,请安装。有关安装,请参见此处3358 docs . celery project . org/en/latest/getting-started/brokers/rabbit MQ . html # ID3
你也可以使用Redis作为代理。
安装redis组件
pip install -U celery[redis]
部署
配置很简单,只需配置Redis数据库的位置:
app . conf . broker _ url= redis://localhost:6379/0 其中URL的格式为:
redis://:password @ hostname:port/db _ number方案后面的所有字段都是可选的,默认为本地主机,端口6379,使用数据库0。
如果想得到每个任务的执行结果,还需要配置任务结果存在哪里。
如果您还想在Redis中存储任务的状态和返回值,您应该配置这些设置:
app . conf . result _ back end= redis://localhost:6379/0
1.3开始使用芹菜并安装芹菜模块$ pip安装芹菜。
创建一个芹菜应用程序来定义您的任务列表。
创建一个任务文件,并将其命名为tasks.py
从芹菜进口芹菜
app=芹菜(任务,
broker=redis://localhost ,
后端=redis://localhost )
@app.task
def add(x,y):
打印(正在运行.,x,y)
X y启动芹菜工开始监听,执行任务。
$ celery-任务工作者- loglevel=info
呼叫任务
打开另一个终端,执行命令行模式,并调用任务。
从任务导入添加
Add.delay(4,4)当你看你的工人终端时,会显示你收到了一个任务。这时候如果想看到任务的结果,就需要在调用任务的时候赋一个变量。
结果=加法延迟(4,4)
ready()方法返回任务是否已完成处理:
result.ready()
false您可以等待结果完成,但这很少使用,因为它会将异步调用变成同步调用:
result.get(timeout=1)
8如果任务引发异常,get()将重新引发异常,但是您可以通过指定propagate参数来覆盖该异常:
result.get(propagate=False)如果任务引发了异常,您还可以访问原始回溯:
结果.追溯
…
二、项目中如何使用芹菜。芹菜可以配置成一个应用程序。
目录格式如下
proj/__init__。巴拉圭
/芹菜. py
/tasks.py proj/celery.py内容
from _ _ future _ _ import absolute _ import,unicode_literals
从芹菜进口芹菜
app=芹菜( proj ,
broker=amqp://,
后端=amqp://,
include=[项目任务])
#可选配置,请参阅应用程序用户指南。
app.conf.update(
结果过期=3600,
)
if __name__==__main__ :
app.start()项目/任务。巴拉圭中的内容
from _ _ future _ _ import absolute _ import,unicode_literals
从。芹菜进口应用
@app.task
def add(x,y):
返回x y
@app.task
def mul(x,y):
返回x * y
@app.task
定义xsum(数字):
返回总和(数字)启动工人
$芹菜-项目工人-我信息
输出
-芹菜@ Alexs-MacBook-pro。本地v 4。0 .2(潜在呼叫)
- **** -
- * *** * -达尔文-15 .6 .0-x86 _ 64-i386-64位2017-01-26 21:50:24
- * - **** -
-* *-[配置]
- ** - .应用程序:项目:0x103a020f0
- ** - .transport:redis://localhost:6379//
- ** - .结果:redis://localhost/
- *** - * - .并发性:8(前期工作)
- ******* - .任务事件:关闭(启用-E以监视该工作线程中的任务)
- ***** -
-[队列]。芹菜交换=芹菜(直接)键=芹菜
后台启动工人
在后台在生产中,您会希望在后台运行工人,这在后台化教程中有详细描述。
守护进程脚本使用多命令在后台启动一个或多个工作进程:
$芹菜多起点w1 -A项目信息
芹菜多版本4.0.0(延迟调用)
开始节点.
w1。宁静。当地:好的你也可以重启:
芹菜多重重启w1 -A项目信息
芹菜多版本4.0.0(延迟调用)
停止节点.
w1。宁静。本地:TERM - 64024
正在等待一个节点.
好的
重新启动节点好的
芹菜多版本4.0.0(延迟调用)
停止节点.
w1。宁静。本地:TERM-64052或停止:
芹菜多站w1-A项目信息站命令是异步的,因此它不会等待工人关闭。您可能希望使用停止等待命令,这样可以确保所有当前正在执行的任务在退出之前都已完成:
$芹菜多站等待w1 -A项目信息
三、芹菜定时任务芹菜支持定时任务,设定好任务的执行时间,芹菜就会定时自动帮你执行,这个定时任务模块叫芹菜搅打
写一个脚本叫定期_任务。巴拉圭
从芹菜进口芹菜
从芹菜.时间表导入例行性工作排程
app=芹菜()
@ app。on _ after _ configure连接
定义设置周期任务(发送方,* *夸脱):
#每10秒进行一次呼叫测试(你好).
sender.add_periodic_task(10.0,test.s(hello ),name=add every 10 )
每30秒调用测试(世界)
sender.add_periodic_task(30.0,test.s(world ),expires=10)
#在每周一早上7:30执行
发件人。添加周期任务(
crontab(小时=7,分钟=30,星期几=1),
"星期一快乐!"),
)
@app.task
定义测试(参数):
打印(参数)添加定期任务会添加一条定时任务
上面是通过调用函数添加定时任务,也可以像写配置文件一样的形式添加,下面是每30秒执行的任务app.conf.beat_schedule={
"每30秒添加一次":{
任务:任务。添加,
时间表:30.0,
args: (16,16)
},
}
app.conf.timezone=UTC
任务添加好了,需要让芹菜单独启动一个进程来定时发起这些任务,注意,这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划,每发现有任务需要执行了,就发起一个任务调用消息,交给芹菜工人去执行
启动任务调度器芹菜搅打
$芹菜-周期性任务节拍
输出像下面这样
芹菜节拍4.0.2版(延迟调用)开始了。
__ - .__ - _
本地时间- 2017-02-08 18:39:31
配置-。broker-redis://localhost:6379//。装载机-芹菜。装载机。app。应用程序加载器。日程安排-芹菜。击败。持久调度程序。分贝加速节拍-计划。日志文件-[标准错误]@ %警告。最大间隔- 5.00分钟(300秒)
此时还差一步,就是还需要启动一个工人,负责执行芹菜搅打发起的任务
启动芹菜工人来执行任务
$芹菜-周期性任务工人
-芹菜@ Alexs-MacBook-pro。本地v 4。0 .2(潜在呼叫)
- **** -
- * *** * -达尔文-15 .6 .0-x86 _ 64-i386-64位2017-02-08 18:42:08
- * - **** -
-* *-[配置]
- ** - .应用程序:任务:0x104d420b8
- ** - .transport:redis://localhost:6379//
- ** - .结果:redis://localhost/
- *** - * - .并发性:8(前期工作)
- ******* - .任务事件:关闭(启用-E以监视该工作线程中的任务)
- ***** -
-[队列]。芹菜交换=芹菜(直接)键=芹菜
好啦,此时观察工人的输出,是不是每隔一小会,就会执行一次定时任务呢!
注意:击败需要将任务的最后运行时间存储在一个本地数据库文件中(默认为namedcelerybeat-schedule),因此它需要访问当前目录,或者您可以为该文件指定一个自定义位置:
$芹菜-一个周期_任务节拍-s/home/芹菜/var/run/芹菜节拍-时间表
更复杂的定时配置上面的定时任务比较简单,只是每多少s执行一个任务,但如果你想要每周一三五的早上8点给你发邮件怎么办呢?哈,其实也简单,用例行性工作排程功能,跟Linux操作系统操作系统自带的例行性工作排程功能是一样的,可以个性化定制任务执行时间
从芹菜.时间表导入例行性工作排程
app.conf.beat_schedule={
#在每周一早上7:30执行
添加-每周一上午:
任务:任务。添加,
调度:crontab(小时=7,分钟=30,星期几=1),
args: (16,16),
},
}
上面的这条意思是每周一的早上7.30执行任务。添加任务
还有更多定时配置方式如下:
例子
意义
crontab()
分分钟执行。
crontab(分钟=0,小时=0)
每天午夜执行。
crontab(分钟=0,小时=*/3 )
每三个小时执行一次:午夜、凌晨3点、6点、9点、中午、下午3点、6点、9点。
crontab(分钟=0,小时=0,3,6,9,12,15,18,21 )
和以前一样。
crontab(分钟=*/15 )
每15分钟执行一次。
crontab(星期几=星期日)
每分钟执行(!)在星期天。
crontab(分钟=* ,小时=* ,星期几=星期日)
和以前一样。
crontab(分钟=*/10 ,小时=3,17,22 ,星期几=星期四,星期五)
每十分钟执行一次,但只在周四或周五的凌晨3-4点、下午5-6点和晚上10-11点执行。
crontab(分钟=0,小时=*/2,*/3 )
每隔偶数小时执行一次,并且每小时可被三整除。这意味着:除了凌晨一点、5点、7点、11点、下午一点、5点、7点、11点以外的每个小时
crontab(分钟=0,小时=*/5 )
执行可被5整除的小时。这意味着它在下午3点触发,而不是下午5点(因为下午3点等于24小时制的值"15",可被5整除)。
crontab(分钟=0,小时=*/3,8-17 )
被3整除的每小时执行一次,办公时间(上午8时至下午5时)每小时执行一次。
crontab(0,0,day_of_month=2 )
在每月的第二天执行。
crontab(0,0,day_of_month=2-30/3 )
在每个偶数日执行。
crontab(0,0,day_of_month=1-7,15-21 )
在每月的第一周和第三周执行。
crontab(0,0,月中日期=11 ,年中月份=5 )
每年5月11日执行。
crontab(0,0,年月日=*/3 )
在每个季度的第一个月执行。
上面能满足你绝大多数定时任务需求了,甚至还能根据潮起潮落来配置定时任务,具体看http://份文件。芹菜项目。org/en/最新/用户指南/定期-任务。html #太阳能时间表
四、最佳实践之与框架结合
框架可以轻松跟芹菜结合实现异步任务,只需简单配置即可
如果你有一个现代的姜戈项目布局,比如:
-项目/
- proj/__init__ .巴拉圭
- proj/settings.py
- proj/urls.py
manage.pythen建议创建一个定义芹菜实例的newproj/proj/celery.pymodule:
文件:proj/proj/celery.py
from _ _ future _ _ import absolute _ import,unicode_literals
导入操作系统
从芹菜进口芹菜
#设置"芹菜"程序的默认姜戈设置模块。
OS。环境。设置默认值( DJANGO _ SETTINGS _ MODULE ,项目设置)
app=芹菜(“项目”)
#在这里使用字符串意味着工人不必序列化
#子进程的配置对象。
# -名称空间=芹菜表示所有与芹菜相关的配置键
#应该有一个"芹菜_"前缀。
app。config _ from _ object( django。conf:settings ,namespace=CELERY )
#从所有注册的姜戈应用程序配置中加载任务模块。
app.autodiscover_tasks()
@app.task(bind=True)
定义调试任务(自身):
打印(请求:{0!r} .格式(自我请求))
然后你需要把这个应用导入到你的proj/proj/__init__ .巴拉圭模块。这确保了在姜戈启动时加载应用程序,以便@shared_task decorator(后面会提到)使用它:proj/proj/__init__ .py:
from _ _ future _ _ import absolute _ import,unicode_literals
#这将确保应用程序总是在以下情况下导入
#姜戈启动,以便共享任务将使用此应用程序。
从。芹菜进口应用为芹菜_app
__all__=[celery_app]
请注意,这个示例项目布局适用于较大的项目,对于简单的项目,您可以使用一个包含的模块来定义应用程序和任务,就像芹菜教程的第一步一样。
让我们分解一下第一个模块中发生的事情,首先我们从未来导入绝对导入,这样我们的芹菜. py模块就不会与库冲突:
从_ _未来_ _进口绝对_进口中,我们为芹菜命令行程序设置默认的DJANGO _设置_模块环境变量:
OS。环境。设置默认值( DJANGO _ SETTINGS _ MODULE ,项目设置)你不需要这一行,但是它让你不用总是把设置模块传入芹菜程序。它必须始终在创建应用程序实例之前出现,正如我们接下来要做的:
app=芹菜( proj )这是我们的库实例。
我们还添加了姜戈设置模块作为芹菜的配置源。这意味着您不必使用多个配置文件,而是直接从姜戈设置中配置芹菜但是如果需要,您也可以将它们分开。
大写的名称空间意味着所有芹菜配置选项必须以大写字母而不是小写字母指定,并且以芹菜_开头,因此例如 task_always_eager 设置变成芹菜_任务_总是_渴望,经纪人_网址设置变成芹菜_经纪人_网址.
您可以在这里直接传递对象,但是使用字符串更好,因为这样工人就不必序列化对象。
app。config _ from _ object( django。conf:settings ,namespace=CELERY )接下来,可重用应用程序的常见做法是在单独的tasks.py模块中定义所有任务,芹菜确实有办法自动发现这些模块:
芹菜上方有一行的app.autodiscover_tasks()将按照tasks.py约定,从所有已安装的应用程序中自动发现任务:
- app1/
- tasks.py
- models.py
-附录2/
- tasks.py
- models.py
最后,调试任务示例是一个转储自己的请求信息的任务。这是使用在芹菜3.1中引入的新的绑定=真任务选项来轻松引用当前任务实例。
然后在具体的应用里的tasks.py里写你的任务
#在此创建您的任务
from _ _ future _ _ import absolute _ import,unicode_literals
从芹菜导入共享任务_任务
@共享任务
def add(x,y):
返回x y
@共享任务
def mul(x,y):
返回x * y
@共享任务
定义xsum(数字):
返回总和(数字)
在你的框架观点里调用来自django .捷径的芹菜导入呈现,HttpResponse
#在此创建您的观点。
从伯纳德导入任务
定义任务_测试(请求):
res=tasks.add.delay(228,24)
打印("开始运行任务")
打印(异步任务资源,资源获取() )
返回HttpResponse(res %s%res.get())五、在框架中使用计划任务功能
有一个姜戈-芹菜-节拍扩展将时间表存储在框架数据库中,并提供了一个方便的管理界面来管理运行时的周期性任务。
要安装和使用此扩展:
使用点安装软件包:$ pip安装姜戈-芹菜-节拍添加姜戈_芹菜_节拍
已安装应用程序的模块
在姜戈项目的settings.py中
:INSTALLED_APPS=(
.
姜戈_芹菜_节拍,
)
注意,模块名中没有破折号,只有下划线。应用框架数据库迁移,以便创建必要的表:$ python manage.py migrateStart使用姜戈启动大提琴节拍服务
日程安排:芹菜-一个项目节拍-l信息-S姜戈访问django-管理界面来设置一些周期性任务。
在管理页面里,有3张表
配置完长这样
此时启动你的芹菜搅打和工人,会发现每隔2分钟,击败会发起一个任务消息让工人执行scp _任务任务
注意,经测试,每添加或修改一个任务,芹菜打都需要重启一次,要不然新的配置不会被芹菜搅打进程读到
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。