当前位置: 首页>数据库>正文

消息队列异步下单 消息队列异步处理任务

 

什么是Celery

celery是一个异步任务队列/基于分布式消息传递的作业队列,分布式队列服务。它侧重于实时操作,但对调度支持也很好。

celery用于生产系统每天处理数以百万计的任务。

celery是用Python编写的,但该协议可以在任何语言实现。它也可以与其他语言通过webhooks实现。

建议的消息代理RabbitMQ的,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, ,和数据库(使用SQLAlchemy的或Django的 ORM) 。

celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。

应用场景

  • 异步任务
    一些耗时较长的操作,如果用户等待后台数据返回,将会极大影响用户体验时,我们使用 异步消息队列,就可以解决这个问题啦。前端可以迅速响应用户请求,而一些异步操作则交给消息队列去执行啦。比如发送短信/邮件、消息推送、音视频处理等等。。
  • 定时任务:
    定时执行某件事情,比如每天数据统计
    优惠券定期删除
    等等。。

Celery架构图

消息队列异步下单 消息队列异步处理任务,消息队列异步下单 消息队列异步处理任务_linux,第1张

  • Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
  • Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
  • Broker:消息代理,又称消息中间件,Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。 接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作为消息代理,但适用于生产环境的只有RabbitMQ和Redis, 官方推荐 RabbitMQ。
  • Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
  • Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

实现

  1. 准备一个项目,结构如下。

消息队列异步下单 消息队列异步处理任务,消息队列异步下单 消息队列异步处理任务_消息队列异步下单_02,第2张

  • app.py : 实例化 Celery。
  • config.py : Celery 相关配置。
  • task.py : 任务函数文件。
  • worker.py : 执行调用任务工作。

接下来依次看一下代码。

app.py

# -*- coding: utf-8 -*-
__author__ = 'LiuNan'
__date__ = '2020/5/22 19:52'

from celery import Celery

# app是Celery类的实例,创建的时候添加了proj.tasks这个模块,也就是包含了proj/tasks.py这个文件。
app = Celery('Celery_project', include=['Celery_project.task'])

app.config_from_object('Celery_project.config')

if __name__ == '__main__':
    app.start()

config.py

# -*- coding: utf-8 -*-
__author__ = 'LiuNan'
__date__ = '2020/5/22 19:53'

BROKER_URL = 'redis://:yourpasswd@localhost' # 使用Redis作为消息代理

CELERY_RESULT_BACKEND = 'redis://:yourpasswd@localhost:6379/0' # 把任务结果存在了Redis

CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案

CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型

task.py

# -*- coding: utf-8 -*-
__author__ = 'LiuNan'
__date__ = '2020/5/22 19:50'

import time
from Celery_project.app import app


@app.task
def add(x, y):
    time.sleep(5) # 模拟执行时间5秒
    return x + y

worker.py

# -*- coding: utf-8 -*-
__author__ = 'LiuNan'
__date__ = '2020/5/22 19:56'

import time
# from Celery_project.task import add
import sys
sys.path.append(r'/root/')

from Celery_project.task import add

t1 = time.time()
# 用 delay() 方法来调用任务
# 调用任务会返回一个 AsyncResult 实例,可用于检查任务的状态,等待任务完成或获取返回值(如果任务失败,则为异常和回溯)。
r1 = add.delay(1, 2)
r2 = add.delay(3, 2)
r3 = add.delay(7, 2)
r4 = add.delay(8, 2)
r5 = add.delay(10, 2)

r_list = [r1, r2, r3, r4, r5]
for r in r_list:
    while not r.ready():
        pass
    print(r.result)
#
# print(add(1,2))
# print(add(3,2))
# print(add(7,2))
# print(add(8,2))
# print(add(10,2))
#

t2 = time.time()
print('耗时%s' % str(t2 - t1))

运行

  1. 启动redis

  2. 切换至所在目录,执行 celery -A Celery_project.app worker -l info 运行结果图,如下。

消息队列异步下单 消息队列异步处理任务,消息队列异步下单 消息队列异步处理任务_消息队列异步下单_03,第3张

  1. 运行调用任务文件 worker.py

消息队列异步下单 消息队列异步处理任务,消息队列异步下单 消息队列异步处理任务_爬虫_04,第4张

消息队列异步下单 消息队列异步处理任务,消息队列异步下单 消息队列异步处理任务_linux_05,第5张




  1. 可以看出, 程序总共执行了 5 秒.
    我定义任务执行时间 是5秒, 如果是同步执行, 我执行了5次, 那么最少需要 25 秒.
    所以可以看出celery 的作用.

https://www.xamrdz.com/database/6tc1934705.html

相关文章: