什么是Celery
celery是一个异步任务队列/基于分布式消息传递的作业队列,分布式队列服务。它侧重于实时操作,但对调度支持也很好。
celery用于生产系统每天处理数以百万计的任务。
celery是用Python编写的,但该协议可以在任何语言实现。它也可以与其他语言通过webhooks实现。
建议的消息代理RabbitMQ的,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, ,和数据库(使用SQLAlchemy的或Django的 ORM) 。
celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。
应用场景
- 异步任务
一些耗时较长的操作,如果用户等待后台数据返回,将会极大影响用户体验时,我们使用 异步消息队列,就可以解决这个问题啦。前端可以迅速响应用户请求,而一些异步操作则交给消息队列去执行啦。比如发送短信/邮件、消息推送、音视频处理等等。。 - 定时任务:
定时执行某件事情,比如每天数据统计
优惠券定期删除
等等。。
Celery架构图
- Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
- Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
- Broker:消息代理,又称消息中间件,Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。 接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作为消息代理,但适用于生产环境的只有RabbitMQ和Redis, 官方推荐 RabbitMQ。
- Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
- Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
实现
- 准备一个项目,结构如下。
- app.py : 实例化 Celery。
- config.py : Celery 相关配置。
- task.py : 任务函数文件。
- worker.py : 执行调用任务工作。
接下来依次看一下代码。
app.py
# -*- coding: utf-8 -*-
__author__ = 'LiuNan'
__date__ = '2020/5/22 19:52'
from celery import Celery
# app是Celery类的实例,创建的时候添加了proj.tasks这个模块,也就是包含了proj/tasks.py这个文件。
app = Celery('Celery_project', include=['Celery_project.task'])
app.config_from_object('Celery_project.config')
if __name__ == '__main__':
app.start()
config.py
# -*- coding: utf-8 -*-
__author__ = 'LiuNan'
__date__ = '2020/5/22 19:53'
BROKER_URL = 'redis://:yourpasswd@localhost' # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = 'redis://:yourpasswd@localhost:6379/0' # 把任务结果存在了Redis
CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案
CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型
task.py
# -*- coding: utf-8 -*-
__author__ = 'LiuNan'
__date__ = '2020/5/22 19:50'
import time
from Celery_project.app import app
@app.task
def add(x, y):
time.sleep(5) # 模拟执行时间5秒
return x + y
worker.py
# -*- coding: utf-8 -*-
__author__ = 'LiuNan'
__date__ = '2020/5/22 19:56'
import time
# from Celery_project.task import add
import sys
sys.path.append(r'/root/')
from Celery_project.task import add
t1 = time.time()
# 用 delay() 方法来调用任务
# 调用任务会返回一个 AsyncResult 实例,可用于检查任务的状态,等待任务完成或获取返回值(如果任务失败,则为异常和回溯)。
r1 = add.delay(1, 2)
r2 = add.delay(3, 2)
r3 = add.delay(7, 2)
r4 = add.delay(8, 2)
r5 = add.delay(10, 2)
r_list = [r1, r2, r3, r4, r5]
for r in r_list:
while not r.ready():
pass
print(r.result)
#
# print(add(1,2))
# print(add(3,2))
# print(add(7,2))
# print(add(8,2))
# print(add(10,2))
#
t2 = time.time()
print('耗时%s' % str(t2 - t1))
运行
- 启动redis
- 切换至所在目录,执行
celery -A Celery_project.app worker -l info
运行结果图,如下。
- 运行调用任务文件 worker.py
可以看出, 程序总共执行了 5 秒.
我定义任务执行时间 是5秒, 如果是同步执行, 我执行了5次, 那么最少需要 25 秒.
所以可以看出celery 的作用.