Celery简介
Celery
是一个简单、灵活、可靠的分布式系统,处理大量消息,同时为操作提供维护这样一个系统所需的工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
Celery
的架构由三部分组成,消息中间件(message broker
),任务执行单元(worker
)和任务执行结果存储(task result store
)组成。
celery
是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task
)和定时任务(crontab
)。
异步任务:将耗时操作任务提交给Celery
去异步执行,比如发送短信/邮件、消息推送、音视频处理等等。
定时任务:定时执行某件事情,比如每天数据统计。
Celery
具有以下优点:
- Simple(简单):Celery 使用和维护都非常简单,并且不需要配置文件。
- Highly Available(高可用):woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。
- Fast(快速):单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)。
- Flexible(灵活):Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。
Celery安装
Celery
是一个Python
的第三方库,直接使用以下命令即可下载。
1 | pip install celery |
Celery的使用
基本使用
创建异步任务执行文件
celery_task.py
1 | import celery |
这样我们就使用celery
定义了一个异步任务send_email
。
接下来,开启celery
服务。
1 | celery --app=celery_task worker -P eventlet -l INFO |
想要设置
celery
通过eventlet
方式去调用协程,需要额外下载eventlet
库,并使用-P enentlet
去指定。注意,在此处我们使用redis
作为存储的中间件,必须保证Redis
环境可用。
此时Celery
捕获到待执行的异步任务send_email
,并且开始监听任务的执行。
接下来我们编写一个脚本去调用该异步任务。
produce_task.py
1 | from celery_task import send_email |
执行该任务,会直接返回对应的ID
,不会产生等待。ID
的意义,后面会详细讲到。
查看Celery
监听情况,可以发现Celery
已经执行了两次任务,并且总共花费5
秒时间,从消耗的时间我们也可以看出,Celery
的异步执行结果,如果是非异步那么总共应该会花费10
秒的时间去执行任务。
到这一步,理论上我们的异步任务就已经执行完毕了,倘若在某些任务中,我们需要异步任务的返回结果,那么这个时候应该如何获取呢?
实际上,我们前面返回的ID
就是去标记了每个任务的返回结果存放在Redis
中的位置。
我们可以使用以下代码去获取指定任务的返回结果。
1 | from celery.result import AsyncResult |
多任务结构
前面我们主要是在单个脚本中去定义所有的任务,当业务变得复杂化后,在同一个脚本中存放所有的异步任务肯定是不合适的,这个时候就需要在多个不同的脚本中去定义不同的任务,这个时候Celery
应该如何去设计呢?
任务结构
此时将多个不同的task
脚本存放在tasks
文件夹下,在celery_main
中定义了celery
对象,并在include
参数中设置对应的task
脚本。
celery_main.py
1 | from celery import Celery |
task01.py
1 | import time |
task02.py
1 | import time |
接下来,开启Celery
服务。
1 | celery --app=celery_main worker -P eventlet -l INFO |
此时,同时监听两个异步任务。
接下来我们编写一个脚本去调用该异步任务。
同样可以完成异步任务。
定时任务
设定时间让celery
执行一个定时任务,produce_task_ontime.py
。
例如,10s
后执行某个任务。
1 | from celery_task import send_email |
从日志中可以看出,一共消耗了15秒,这个时间刚好为10秒的等待+5秒的函数执行时间。
多任务结构定时任务
此时需要修改celery_main.py
文件。
例如,每6秒执行一次某个异步任务(在此为task01.send_email
)。
1 | from celery import Celery |