【从0开始Python开发实战】Django集成Celery-创新互联

目录:

目前创新互联建站已为近1000家的企业提供了网站建设、域名、虚拟主机、网站托管维护、企业网站设计、贵池网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

1. Django集成Celery

2. 声明异步任务

3. 封装工具task_util.py

4. 单元测试test_task_util.py

5. 创建异步任务

6. 常见问题和解决方法

Celery是一个灵活可靠的分布式系统,用于异步任务调度,主要有3部分组成:消息中间件broker,任务执行单元worker,执行结果存储task result store。Celery使用第三方消息中间件Redis,RabbitMQ等。

【从0开始Python开发实战】Django集成Celery

系统通常将一些耗时的操作任务提交给Celery去异步执行,典型架构示意图如下。本文详细介绍Django集成Celery配置方法和功能测试。

【从0开始Python开发实战】Django集成Celery

时序图如下:

【从0开始Python开发实战】Django集成Celery

示例代码:https://github.com/rickding/HelloPython/tree/master/hello_celery

├── __init__.py

├── settings.py

├── celery.py

├── tasks.py

├── util

│   └── task_util.py

├── tests

│   └── test_task_util.py

一,Django集成Celery


代码文件

功能要点

Django集成Celery

requirements.txt

安装Celery, Redis和工具包:

celery == 4.2.1

flower == 0.9.2

redis == 3.2.0

eventlet == 0.24.1

celery.py

配置Celery,依赖的消息中间件broker和后端backend地址配置在settings.py中集中维护。

__init__.py

配置项目加载celery.app

声明异步任务

tasks.py

声明Celery可调度的任务@shared_task

封装工具task_util

task_util.py

异步任务创建和分发

单元测试

test_task_util.py

测试异步任务创建和分发功能

创建异步任务

views.py

增加REST接口/chk/job

1. 新建Django项目,运行:django-admin startproject hello_celery

2. 进到目录hello_celery,增加应用:python manage.py startapp app

【从0开始Python开发实战】Django集成Celery

项目的目录文件结构如下:

【从0开始Python开发实战】Django集成Celery

3. 安装Celery和依赖包,pip install celery >= 4.2.1,如果不是新建项目,注意版本兼容问题。

celery == 4.2.1
flower == 0.9.2
redis == 3.2.0
eventlet == 0.24.1

4. 增加celery.py,配置信息:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hello_celery.settings')

app = Celery(
'hello_celery',
include=['hello_celery.tasks'],
broker=settings.CELERY_BROKER,
backend=settings.CELERY_BACKEND
)

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

app.conf.update(
CELERY_ACKS_LATE=True,
CELERY_ACCEPT_CONTENT=['pickle', 'json'],
CELERYD_FORCE_EXECV=True,
CELERYD_MAX_TASKS_PER_CHILD=500,
BROKER_HEARTBEAT=0,
)

# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,  # celery任务执行结果的超时时间,即结果在backend里的保存时间,单位s
)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

platforms.C_FORCE_ROOT = True

5. 打开settings.py,配置BROKER和BACKEND地址:

CELERY_BROKER = 'redis://127.0.0.1:6379/2'
CELERY_BACKEND = 'redis://127.0.0.1:6379/3'

6. 打开__init__.py,增加代码:

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']

二,增加tasks.py,声明异步任务

from __future__ import absolute_import, unicode_literals
import logging
import json

from celery import shared_task
from hello_celery.util.task_util import dispatch_task

log = logging.getLogger(__name__)

@shared_task
def task(param_str):
log.info('task starts: %s, %s' % (type(param_str), param_str))

param_dict = None
try:
param_dict = json.loads(param_str)
except Exception as e:
log.warning('Exception when parse param: %s' % str(e))

log.info('parsed param: {}, {}'.format(type(param_dict), param_dict))
return 'finished'

正确配置后,运行命令:celery -A hello_celery worker -l info -P eventlet,注意Win10环境中需要增加eventlet,Celery成功启动信息:

【从0开始Python开发实战】Django集成Celery

三,封装工具task_util.py

封装两个有用的工具函数,分别用于分发(创建)异步任务和获取任务信息:

import logging
import json

log = logging.getLogger(__name__)

def dispatch_task(task_func, param_dict):
param_json = json.dumps(param_dict)

try:
return task_func.apply_async(
 [param_json],
 retry=True,
 retry_policy={
     'max_retries': 1,
     'interval_start': 0,
     'interval_step': 0.2,
     'interval_max': 0.2,
 },
)
except Exception as ex:
log.info(ex)
raise

def get_task_status(task_func, task_id):
t = task_func.AsyncResult(task_id)
status = t.state
progress = 0

if status == u'SUCCESS':
progress = 100
elif status == u'FAILURE':
progress = 0
elif status == 'PROGRESS':
progress = t.info['progress']

return {'status': status, 'progress': progress}

四,单元测试test_task_util.py

增加测试函数,创建一个任务任务并获取信息:

import logging

from django.test import TestCase

from hello_celery.tasks import task
from hello_celery.util.task_util import dispatch_task, get_task_status

log = logging.getLogger(__name__)

class TasksTest(TestCase):
def test_get_task_status(self):
t = dispatch_task(task, {'msg': 'test_task'})
self.assertIsNotNone(t)

ret = get_task_status(task, t.id)
log.info('task status: %s,%s, %s' % (ret, t.id, str(task)))
self.assertIsNotNone(ret.get('status'))

运行python manage.py test,同时Celery将执行测试函数创建的任务:

【从0开始Python开发实战】Django集成Celery

五,创建异步任务

1. 在views.py中增加请求处理函数,创建一个异步执行的任务:

from django.http import JsonResponse
from hello_celery.tasks import do_task

def chk_job(req):
param_dict = {
'url': req.get_raw_uri(),
'path': req.get_full_path(),
}
job = do_task(param_dict)
return JsonResponse({'code': 0, 'msg': 'success', 'job': job.task_id})

2. 在urls.py中配置路由

from django.urls import path
from app.views import chk_job

urlpatterns = [
path('', chk_job, name='chk'),
]

3. 运行命令启动服务:python manage.py runserver 0.0.0.0:8001

【从0开始Python开发实战】Django集成Celery

4. REST接口创建异步任务示例

【从0开始Python开发实战】Django集成Celery

六,常见问题和解决方法

1. 启动Celery: celery -A hello_celery worker -l info,运行出错:

Unrecoverable error: VersionMismatch('Redis transport requires redis-py versions 3.2.0 or later. You have 2.10.6',)

解决:指定Redis使用3.2.0或更高pip install redis>=3.2.0

另外有需要云服务器可以了解下创新互联建站www.cdcxhl.com,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。

标题名称:【从0开始Python开发实战】Django集成Celery-创新互联
分享路径:https://www.cdcxhl.com/article22/ddcdjc.html

成都网站建设公司_创新互联,为您提供外贸网站建设网站导航做网站网站收录企业建站服务器托管

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

外贸网站建设