0%

异步任务队列Celery

Celery简介

Celery 是一个简单、灵活、可靠的分布式系统,处理大量消息,同时为操作提供维护这样一个系统所需的工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。

img

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等。

定时任务:定时执行某件事情,比如每天数据统计。

Celery具有以下优点:

  1. Simple(简单):Celery 使用和维护都非常简单,并且不需要配置文件。
  2. Highly Available(高可用):woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。
  3. Fast(快速):单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)。
  4. Flexible(灵活):Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。

Celery安装

Celery是一个Python的第三方库,直接使用以下命令即可下载。

1
pip install celery

Celery的使用

基本使用

创建异步任务执行文件

celery_task.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import celery
import time

backend='redis://127.0.0.1:6379/1' # 用于存储异步执行结果
broker='redis://127.0.0.1:6379/2' # 消息中间件,任务队列

cel=celery.Celery('test',backend=backend,broker=broker)

@cel.task
def send_email(name):
print("向%s发送邮件..."%name)
time.sleep(5)
print("向%s发送邮件完成"%name)
return "ok"

这样我们就使用celery定义了一个异步任务send_email

接下来,开启celery服务。

1
celery --app=celery_task worker -P eventlet -l INFO

想要设置celery通过eventlet方式去调用协程,需要额外下载eventlet库,并使用-P enentlet去指定。注意,在此处我们使用redis作为存储的中间件,必须保证Redis环境可用。

image-20240623230920729

此时Celery捕获到待执行的异步任务send_email,并且开始监听任务的执行。

接下来我们编写一个脚本去调用该异步任务。

produce_task.py

1
2
3
4
5
6
7
from celery_task import send_email

result = send_email.delay("minglog")
print(result.id)

result2 = send_email.delay("kacheX")
print(result2.id)

执行该任务,会直接返回对应的ID,不会产生等待。ID的意义,后面会详细讲到。

image-20240623231034614

查看Celery监听情况,可以发现Celery已经执行了两次任务,并且总共花费5秒时间,从消耗的时间我们也可以看出,Celery的异步执行结果,如果是非异步那么总共应该会花费10秒的时间去执行任务。

image-20240623231231312

到这一步,理论上我们的异步任务就已经执行完毕了,倘若在某些任务中,我们需要异步任务的返回结果,那么这个时候应该如何获取呢?

实际上,我们前面返回的ID就是去标记了每个任务的返回结果存放在Redis中的位置。

我们可以使用以下代码去获取指定任务的返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from celery.result import AsyncResult
from celery_task import cel

async_result=AsyncResult(id="c4e0d796-4803-4e92-bf8e-d9472f4fd34b", app=cel)

if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')

image-20240623231601008

多任务结构

前面我们主要是在单个脚本中去定义所有的任务,当业务变得复杂化后,在同一个脚本中存放所有的异步任务肯定是不合适的,这个时候就需要在多个不同的脚本中去定义不同的任务,这个时候Celery应该如何去设计呢?

任务结构

image-20240623232330632

此时将多个不同的task脚本存放在tasks文件夹下,在celery_main中定义了celery对象,并在include参数中设置对应的task脚本。

celery_main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery import Celery

backend='redis://127.0.0.1:6379/1' # 用于存储异步执行结果
broker='redis://127.0.0.1:6379/2' # 消息中间件,任务队列
include=[
'tasks.task01',
'tasks.task02'
]

cel = Celery('celery_demo',backend=backend,broker=broker, include=include)

# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

task01.py

1
2
3
4
5
6
7
import time
from celery_main import cel

@cel.task
def send_email(res):
time.sleep(5)
return "完成向%s发送邮件任务"%res

task02.py

1
2
3
4
5
6
7
import time
from celery_main import cel

@cel.task
def send_msg(name):
time.sleep(5)
return "完成向%s发送短信任务"%name

接下来,开启Celery服务。

1
celery --app=celery_main worker -P eventlet -l INFO

此时,同时监听两个异步任务。

image-20240623232824164

接下来我们编写一个脚本去调用该异步任务。

image-20240623232851319

同样可以完成异步任务。

image-20240623232909038

定时任务

设定时间让celery执行一个定时任务,produce_task_ontime.py

例如,10s后执行某个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
from celery_task import send_email
from datetime import datetime

ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间
result = send_email.apply_async(args=["minglog"], eta=task_time)
print(result.id)

image-20240623233451385

从日志中可以看出,一共消耗了15秒,这个时间刚好为10秒的等待+5秒的函数执行时间。

多任务结构定时任务

此时需要修改celery_main.py文件。

例如,每6秒执行一次某个异步任务(在此为task01.send_email)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from celery import Celery

backend='redis://127.0.0.1:6379/1' # 用于存储异步执行结果
broker='redis://127.0.0.1:6379/2' # 消息中间件,任务队列
include=[
'tasks.task01',
'tasks.task02'
]

cel = Celery('celery_demo',backend=backend,broker=broker, include=include)

cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
# 名字随意命名
'add-every-10-seconds': {
# 执行tasks1下的test_celery函数
'task': 'tasks.task01.send_email',
# 每隔2秒执行一次
# 'schedule': 1.0,
# 'schedule': crontab(minute="*/1"),
'schedule': timedelta(seconds=6),
# 传递参数
'args': ('minglog',)
},
# 'add-every-12-seconds': {
# 'task': 'celery_tasks.task01.send_email',
# 每年4月11号,8点42分执行
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'args': ('张三',)
# },
}
-------------本文结束感谢您的阅读-------------