django-celery,django-celery-results
Celery是基于python开发的分布式异步消息任务队列,通过它可以轻松实现任务的异步处理。下面这篇文章主要介绍Django使用芹菜的相关信息,有需要的朋友可以参考一下。
00-1010 1、django应用celery2、项目应用1、异步任务redis2、定时任务3、任务绑定4、任务钩子5、任务调度6、芹菜管理与监控总结。
目录
django框架的请求/响应过程是同步的,框架本身无法实现异步响应。
但是我们在项目过程中经常会遇到一些耗时的任务,比如发邮件、发短信、大数据统计等。这些操作耗时较长,同步执行对用户体验非常不友好,这种情况下就需要实现异步执行。
异步执行前端一般用Ajax,后端用芹菜。
1、django应用Celery
Django项目应用芹菜,主要有两种任务。一个是异步任务(发布者任务),一般是web请求,另一个是调度任务。
celery组成
Celery是Python开发的一个简单、灵活、可靠的分布式任务队列。它是一个处理异步任务的框架。其本质是一种生产者-消费者模式。生产者将任务发送到消息队列,消费者负责处理任务。芹菜侧重于实时操作,但也有很好的调度支持。它每天可以处理数百万个任务。特点:
简单:熟悉了芹菜的工作流程后,就很容易配置和使用了。
高可用性:当任务执行失败或执行过程中连接中断时,芹菜会自动尝试重新执行任务。
快速:单进程芹菜每分钟可以处理数百万个任务。
灵活性:几乎芹菜的每个组件都可以扩展和定制。
芹菜由三部分组成:
消息代理:官方提供了很多替代方案,支持RabbitMQ、Redis、亚马逊SQS、MongoDB、Memcached等。官方推荐RabbitMQ。
任务执行单元(Worker):任务执行单元,负责从消息队列中取出任务执行。它可以在一个或多个或不同的机器节点上启动,这是其分布式实现的核心。
后端:官方提供了很多存储方式:RabbitMQ、Redis、Memcached、SQLAlchemy、Django ORM、Apache Cassandra、Elasticsearch等。
该架构如下所示:
工作原理:
任务模块任务包括异步任务和定时任务。其中,异步任务通常在业务逻辑中触发并发送到消息队列,而定时任务则通过芹菜节拍流程周期性地发送到消息队列;
任务执行单元工作器实时监控消息队列中的任务执行;
Woker在执行任务后将结果保存在后端;
本文使用redis数据库作为消息中间件和结果存储数据库。
2 、项目应用
1.安装库
pip安装芹菜
pip安装redis
2 .芹菜
在主项目目录中,创建一个新的celery.py文件:
导入操作系统
进口django
从芹菜进口芹菜
从django.conf导入设置
#设置系统环境变量,安装django,必须设置,否则启动芹菜时会报错。
# celery_study是当前项目名称。
OS . environ . set default( DJANGO _ SETTINGS _ MODULE , celery_study.settings )
django.setup()
芹菜_app=芹菜(芹菜_研究)
芹菜_ app . config _ from _ object( django . conf : settings )
芹菜_app.autodiscover_tasks(
lambda: settings.INSTALLED_APPS)
注意:是和settings.py文件同目录,一定不能建立在项目根目录,不然会引起 celery 这个模块名的命名冲突
同时,在主项目的init.py中,添加如下代码:
from .celery import celery_app
3.settings.py
在配置文件中配置对应的redis配置:
# Broker配置,使用Redis作为消息中间件
注意:所有配置的官方文档:Configuration and defaults — Celery 5.2.0b3 documentation
4.tasks.py
在子应用下建立各自对应的任务文件tasks.py(必须是tasks.py这个名字,不允许修改)
from celery import shared_task
5.调用任务
from .tasks import *
6.启动celery
pip install eventlet
celery -A celery_study worker -l debug -P eventlet
注意 :celery_study是项目名
使用redis时,有可能会出现如下类似的异常
AttributeError: str object has no attribute items
这是由于版本差异,需要卸载已经安装的python环境中的 redis 库,重新指定安装特定版本(celery4.x以下适用 redis2.10.6, celery4.3以上使用redis3.2.0以上):
xxxxxxxxxx pip install redis==2.10.6
7.获取任务结果
在 views.py 中,通过 AsyncResult.get() 获取结果
from celery import result
AsyncResult类的常用的属性和方法:
state: 返回任务状态,等同status;
task_id: 返回任务id;
result: 返回任务结果,同get()方法;
ready(): 判断任务是否执行以及有结果,有结果为True,否则False;
info(): 获取任务信息,默认为结果;
wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
successful(): 判断任务是否成功,成功为True,否则为False;
2.定时任务
在第一步的异步任务的基础上,进行部分修改即可
1.settings.py
from celery.schedules import crontab
说明(更多内容见文档:Periodic Tasks — Celery 5.2.0b3 documentation):
task:任务函数
schedule:执行频率,可以是整型(秒数),也可以是timedelta对象,也可以是crontab对象,也可以是自定义类(继承celery.schedules.schedule)
args:位置参数,列表或元组
kwargs:关键字参数,字典
options:可选参数,字典,任何 apply_async() 支持的参数
relative:默认是False,取相对于beat的开始时间;设置为True,则取设置的timedelta时间
在task.py中设置了日志
from celery import shared_task
2.启动celery
(两个cmd)分别启动worker和beat
celery -A worker celery_study -l debug -P eventlet
3.任务绑定
Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等
方法:
在装饰器中加入参数 bind=True
在task函数中的第一个参数设置为self
在task.py 里面写
from celery import shared_task
其中:self对象是celery.app.task.Task的实例,可以用于实现重试等多种功能
from celery import shared_task
启动celery
celery -A worker celery_study -l debug -P eventlet
4.任务钩子
Celery在执行任务时,提供了钩子方法用于在任务执行完成时候进行对应的操作,在Task源码中提供了很多状态钩子函数如:on_success(成功后执行)、on_failure(失败时候执行)、on_retry(任务重试时候执行)、after_return(任务返回时候执行)
方法:通过继承Task类,重写对应方法即可,
from celery import Task
启动celery
celery -A worker celery_study -l debug -P eventlet
5.任务编排
在很多情况下,一个任务需要由多个子任务或者一个任务需要很多步骤才能完成,Celery也能实现这样的任务,完成这类型的任务通过以下模块完成:
group: 并行调度任务
chain: 链式任务调度
chord: 类似group,但分header和body2个部分,header可以是一个group任务,执行完成后调用body的任务
map: 映射调度,通过输入多个入参来多次调度同一个任务
starmap: 类似map,入参类似*args
chunks: 将任务按照一定数量进行分组
文档:Next Steps — Celery 5.2.0b3 documentation
1.group
urls.py:
path(primitive/, views.test_primitive),
views.py:
from .tasks import *
说明:
通过task函数的 s 方法传入参数,启动任务
上面这种方法需要进行等待,如果依然想实现异步的方式,那么就必须在tasks.py中新建一个task方法,调用group,示例如下:
tasks.py:
@shared_task
urls.py:
path(first_group/, views.first_group),
views.py:
def first_group(request):
2.chain
默认上一个任务的结果作为下一个任务的第一个参数
def test_primitive(request):
3.chord
任务分割,分为header和body两部分,hearder任务执行完在执行body,其中hearder返回结果作为参数传递给body
def test_primitive(request):
6、celery管理和监控
celery通过flower组件实现管理和监控功能 ,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理
官网:flower · PyPI
文档:Flower - Celery monitoring tool — Flower 1.0.1 documentation
安装flower
pip install flower
启动flower
flower -A celery_study --port=5555
说明:
-A:项目名
--port: 端口号
访问
在浏览器输入:http://127.0.0.1:5555
通过api操作
curl http://127.0.0.1:5555/api/workers
总结
到此这篇关于Django中celery使用项目的文章就介绍到这了,更多相关Django中celery使用内容请搜索盛行IT软件开发工作室以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT软件开发工作室!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。