当前位置: 首页>后端>正文

在 FastAPI 中使用 Celery 实现异步任务

什么是 Celery?

Celery 翻译成中文是:芹菜。

Celery 是一个开源的、分布式 的消息任务队列, 在 python 项目中使用。

使用场景

比如,在使用 FastAPI 写后端的过程中,一些耗时的任务(如发送邮件,文件上传等)可以使用 Celery 通过消息任务队列的方式进行 异步 实现,从而提升应用的整体性能。

核心概念

Celery 采用的是 生产者-消费者 设计模式。如下图:其中 celery client 为生产者, celery workers 为消费者。两者之间的为 Broker, 用来存放消息的中间件。

在 FastAPI 中使用 Celery 实现异步任务,第1张
6pp99jwouivhymdxldxe.png

准备工作

  1. 安装 RabbitMQ
  2. 安装 Celery

快速开始

考虑一个场景:在业务代码的某处,我们需要发送邮件通知用户某些信息。比如张三中奖了,需要通知他。

我们可以定义一个函数: sendEmail(),并用类似于 @app.task 修饰,用于发送邮件。然后在需要使用的地方通过 sendEmail.apply_async()sendEmail.delay() 的形式进行调用。

第一步:定义 task

# tasks.py
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def sendEmail(to, message):
    # 模拟发送邮件
    print("send email to {to} with message: {message}").formate(to=to,message=message)

注意:这一步还包括配置 broker。此处使用 RabbitMQ 作为 broker ,除此之外,还可以使用 redis等。

第二步:发送消息(其实就是调用函数)

#main.py
from tasks import sendEmail
.... #此处发现张三中奖了
sendEmail.delay('zhangsan@xxx.com', '哥们,你中大奖了') #通知张三
....

第三步:启动 RabbitMQ

第三步: 启动 Celery server

$ celery -A tasks worker --loglevel=INFO

第四步:启动应用

$ python main.py

完成了以上这些步骤,我们的工作就算完成了。

sendEmail.delay 执行的时候,消息就被发送到 broker 中了。woker 则从 broker 中取消息,执行任务。


https://www.xamrdz.com/backend/3ns1945708.html

相关文章: