一:介绍 异步:由于http是短连接,所以当一个用户的请求,到指定服务器(dns)请求,服务器做处理,并将结果和状态码返回给客户端。
如果我们在发送一个任务请求,这个任务比较耗时,如果不进行异步处理的话,前端会一直等待服务器返回结果,那用户在前端页面无法进行操作。
所以在处理比较耗时的任务时候,需要将任务做异步处理
celery:就是一个异步框架,他讲前端传来的任务,放在celery任务队列中,然后将任务信息放在celery队列里,等待work 去队列里取任务并执行。
二:安装
软件版本:
1 Django==1.11.2
2 celery==3.1.25
3 redis==2.10.5
4 django-celery==3.2.1
5 django-celery-results==1.0.1
项目目录结构:
项目名称:django_celery
APP:test_celery
1 django_celery/
2 django_celery/
3 ├── celery.py
4 ├── celery.pyc
5 ├── __init__.py
6 ├── __init__.pyc
7 ├── settings.py
8 ├── settings.pyc
9 ├── urls.py
10 ├── urls.pyc
11 ├── wsgi.py
12 └── wsgi.pyc
在项目根目录下创建文件名字为:celery.py文件,内容如下:
1 from __future__ import absolute_import
2 import os
3 from celery import Celery
4 from django.conf import settings
5
6 # set the default Django settings module for the 'celery' program.
7 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery.settings')
8
9 app = Celery('django_celery')
10
11 # Using a string here means the worker will not have to
12 # pickle the object when using Windows.
13 app.config_from_object('django.conf:settings')
14 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
15
16 @app.task(bind=True)
17 def debug_task(self):
18 print('Request: {0!r}'.format(self.request))
其中需要修改:
app = Celery('django_celery') os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery.settings') 注意修改成项目的名字。
然后在celery.py目录 ,给___init__追加内容:
1 from __future__ import absolute_import
2
3 # This will make sure the app is always imported when
4 # Django starts so that shared_task will use this app.
5 from .celery import app as celery_app
程序目录结构:
1 test_celery/
2 ├── admin.py
3 ├── admin.pyc
4 ├── apps.py
5 ├── __init__.py
6 ├── __init__.pyc
7 ├── migrations
8 │ ├── __init__.py
9 │ └── __init__.pyc
10 ├── models.py
11 ├── models.pyc
12 ├── tasks.py
13 ├── tasks.pyc
14 ├── tests.py
15 ├── views.py
16 └── views.py
需要应用的根目录下创建我们的任务模块,需要注意文件的名字是tasks.py不要进行更改。内容如下:注意任务上需要使用特定装饰器,这样celery才知道 这个函数是任务函数。
1 from __future__ import absolute_import, unicode_literals
2 from celery import shared_task
3
4
5 @shared_task
6 def add(x, y):
7 return x + y
8
9
10 @shared_task
11 def mul(x, y):
12 return x * y
13
14
15 @shared_task
16 def xsum(numbers):
17 return sum(numbers)
然后在视图中进行调用,views.py内容:
1 #coding:utf-8
2 from django.shortcuts import render
3 from django.http import HttpResponse
4
5 from .tasks import *
6
7 def index(request):
8 ret=mul.delay(1,2)
9 print ret
10 return HttpResponse(u"OK!")
配置setting文件:
需要添加如下的字段:
1 BROKER_URL = 'redis://127.0.0.1:6379/0'
2 CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
3 CELERY_ACCEPT_CONTENT = ['application/json']
4 CELERY_TASK_SERIALIZER = 'json'
5 CELERY_RESULT_SERIALIZER = 'json
导入模块:
1 import djcelery
2 djcelery. setup_loader ( )
因为我在使用redis 来做为celery的队列数据,所以我在broker使用的redis,本地的redis 63179端口,数据库索引为:9(注意redi默认有16个db)。
结果返回使用也是写入redis,也是本地redis 索引为1的数据库。如下定义redis存储数据的格式,如果不指定,结果将会是不规则或者不可以识别的。
注册应用:
1 INSTALLED_APPS = [
2 'django.contrib.admin',
3 'django.contrib.auth',
4 'django.contrib.contenttypes',
5 'django.contrib.sessions',
6 'django.contrib.messages',
7 'django.contrib.staticfiles',
8 'test_celery',
9 'djcelery',
10 ]
django的db初始化:
1 python2.7 manage.py makemigrations
2 python2.7 manage.py migrate
启动django和celery:
1 python2.7 manage.py runserver 0.0.0.0:1602
启动celery:
1 python2.7 manage.py celery worker -l info
执行任务:
通过前端的访问,执行视图里的index函数,使用delay()函数,来调用后端任务函数。该函数的返回值是celery任务的job_id可以通过他的返回值去redis中去结果,效果如下:
后端celery日志:
我们视图中打印job_id:
我根据django打印的job_id去redis中去对应的结果:
celery在redis储存的key值是:celery-task-meta-加上job_id才是完整的key。结果是:josn,如上。
注意:
1:在django和celery 结合的时候 报如下错误:
TypeError: 'Settings' object is not subscriptable
ImportError: No module named timeutils
这是因为当前使用的django-celery和 celery不匹配。直接升级这2个软件之后,即可解决。
1 pip install -U celery
2 pip install -U django-celery
2在运行celery的时候,不能以root用户运行,在manage.py加上如下:
1 os.environ.setdefault('C_FORCE_ROOT', 'true')
2 os.environ.setdefault('DJANGO_SETTINGS_MODULE', '{PATH TO SETTINGS FILE}')
如下:
给运行的task加日志:
1 from celery.utils.log import get_task_logger
2
3 logger = get_task_logger(__name__)
4
5 @app.task
6 def add(x, y):
7 logger.info('Adding {0} + {1}'.format(x, y))
8 return x + y
给celery指定日志路径使用参数:--logfile
1 python2.7 manage.py celery worker -l info --logfile='/export/Logs/celery.log'
默认在不指定worker的数量的时候,数量为内核的个数。可以用参数c来指定。
1 python2.7 manage.py celery worker -l info --logfile='/export/Logs/celery.log' -c 4
学习是一种态度,坚持是质变的利器!