什么是 Celery?
Celery 翻译成中文是:芹菜。
Celery 是一个开源的、分布式 的消息任务队列
, 在 python
项目中使用。
使用场景
比如,在使用 FastAPI
写后端的过程中,一些耗时的任务(如发送邮件,文件上传等)可以使用 Celery
通过消息任务队列的方式进行 异步
实现,从而提升应用的整体性能。
核心概念
Celery 采用的是 生产者-消费者
设计模式。如下图:其中 celery client 为生产者, celery workers 为消费者。两者之间的为 Broker
, 用来存放消息的中间件。
准备工作
- 安装
RabbitMQ
- 安装
Celery
快速开始
考虑一个场景:在业务代码的某处,我们需要发送邮件通知用户某些信息。比如张三中奖了,需要通知他。
我们可以定义一个函数: sendEmail()
,并用类似于 @app.task
修饰,用于发送邮件。然后在需要使用的地方通过 sendEmail.apply_async()
或 sendEmail.delay()
的形式进行调用。
第一步:定义 task
# tasks.py
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def sendEmail(to, message):
# 模拟发送邮件
print("send email to {to} with message: {message}").formate(to=to,message=message)
注意:这一步还包括配置 broker。此处使用 RabbitMQ
作为 broker
,除此之外,还可以使用 redis
等。
第二步:发送消息(其实就是调用函数)
#main.py
from tasks import sendEmail
.... #此处发现张三中奖了
sendEmail.delay('zhangsan@xxx.com', '哥们,你中大奖了') #通知张三
....
第三步:启动 RabbitMQ
第三步: 启动 Celery server
$ celery -A tasks worker --loglevel=INFO
第四步:启动应用
$ python main.py
完成了以上这些步骤,我们的工作就算完成了。
当 sendEmail.delay
执行的时候,消息就被发送到 broker
中了。woker
则从 broker
中取消息,执行任务。