python中的Celery基本使用

qlqwjy 阅读:89 2022-07-27 19:11:02 评论:0

一、Celery异步任务框架

Celery是一个异步任务框架,并且是一个简单、灵活可靠的,处理大量消息的分布式系统

Celery服务为其他项目服务提供异步解决任务的需求,内置socket

Celery可执行的任务:执行异步任务,执行延迟任务,执行定时任务

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

二、Celery架构

Celery是由三部分组成的,消息中间件(message broker)、任务执行单元(worker)、任务结果存储(task result store)组成的。

Broker(任务中间件)————>Worker(任务执行者)————>Backend(任务结果仓库)

消息中间件:Celery是不提供消息服务的,但是可以使用第三方来提供消息服务(提供任务),列如,Redis。

任务执行单元:Worker会自动(后台异步)执行消息中间件(broker)中的任务任务。

任务结果存储:将Worker执行的结果存储在backend中,可以使用Redis来存储

三、Celery任务结构

Celery有两种任务结构,基本结构、包架构封装,但是我们提倡使用包架构封装,因为结构更加清晰,例如:

project 
    ├── celery_task  	# celery包 
    │   ├── __init__.py # 包文件 
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py 
    │   └── tasks.py    # 所有任务函数 
    ├── add_task.py  	# 添加任务 
    └── get_result.py   # 获取结果 

包架构封装

celery.py

from celery import Celery 
 
broker = 'redis://127.0.0.1:6379/1'     # broker 任务队列,任务放到这里面 
backend = 'redis://127.0.0.1:6379/2'    # backend 结果存储,执行结果放在这里面 
 
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks']) 

tasks.py

from .celery import app 
 
@app.task 
def add(x, y): 
    return x + y 
 
@app.task 
def multip(x, y): 
    return x * y 

异步任务执行:

add_task.py

把 tasks.py 中的任务函数添加到 broker 中

windows 首先需要安装:pip install celery 和 pip install eventlet

需要启动 celery, 在包项目下输入以下命令:

C:\project> celery -A celery_task worker -P eventlet -l info

celery -A 包名 worker -P eventlet -l info

from celery_task.tasks import add 
 
# 提交异步任务 
ret = add.delay(5, 3)	# 把add函数任务添加到 broker中,worker在异步实时取出执行 
 
print(ret)	# 0cc72e56-4604-4c00-bb3d-5b456f4869a7	获取执行结果需要此ID 

延迟任务执行:

add_task.py

还是需要先启动celery

from celery_task.tasks import multip 
# 提交延迟任务 
from datetime import datetime, timedelta 
 
# 需要UTC时间 
eta = datetime.utcnow() + timedelta(seconds=10)		# 当前UTC时间往后加10秒 
ret = multip.apply_async(args=(9, 9), eta=eta)		# 10 秒之后执行 
 
print(ret)	# 3c8cfa57-05ff-4a26-b8fa-1f7f2d8051f2	获取执行结果需要此ID 

定时任务执行:

执行定时任务需要从新配置celery.py

from celery import Celery 
from datetime import timedelta 
from celery.schedules import crontab 
 
broker = 'redis://127.0.0.1:6379/1'  # broker 任务队列,任务放到这里面 
backend = 'redis://127.0.0.1:6379/2'  # backend 任务队列,执行结果放在这里面 
 
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks']) 
 
# 时区 
app.conf.timezone = 'Asia/Shanghai' 
# 是否使用UTC 
app.conf.enable_utc = False 
 
app.conf.beat_schedule = { 
    # add 任务 
    'add-task': { 
        'task': 'celery_task.tasks.add', 
        'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点执行一次 
        'args': (300, 150), 
    }, 
    # multip 任务 
    'multip-task': { 
        'task': 'celery_task.tasks.multip', 
        'schedule': timedelta(seconds=3),           # 每三秒执行一次 
        'args': (300, 150), 
    } 
} 

启动 worker 等待执行任务

celery -A celery_task beat -l info 
 
celery -A 包名 beat -l info 

启动 beat 将任务添加 broker 中,让worker执行

celery -A celery_task worker -P eventlet -l info 
 
celery -A 包名 worker -P eventlet -l info 

查看任务执行结果:

get_result.py

from celery_task.celery import app 
from celery.result import AsyncResult 
 
id = '3fedc0d8-32c8-4b1a-af43-fedfac6107a2' 
 
if __name__ == '__main__': 
    asyncs = AsyncResult(id=id, app=app) 
 
    if asyncs.successful(): 
        result = asyncs.get() 
        print(result)	# 成功则取出backend中id对应的值 
         
    elif asyncs.failed(): 
        print('任务失败') 
    elif asyncs.status == 'PENDING': 
        print('任务等待中被执行') 
    elif asyncs.status == 'RETRY': 
        print('任务异常后正在重试,或id不存在') 
    elif asyncs.status == 'STARTED': 
        print('任务已经开始被执行') 

基本结构

创建py文件:celery_app_task.py

from celery import Celery 
import time 
 
# backend='redis://:123456@127.0.0.1:6379/1'	# 有密码123456 
 
broker = 'redis://127.0.0.1:6379/1'  	# broker 任务队列,任务放到这里面 
backend = 'redis://127.0.0.1:6379/2'  	# backend 任务队列,执行结果放在这里面 
 
app = Celery(__name__, broker=broker, backend=backend) 
 
@app.task 
def add(x, y): 
    return x + y 

启动 worker

celery -A celery_app_task worker -P eventlet -l info 

添加任务:add_task.py

from celery_app_task import add 
 
# 提交任务 
ret = add.delay(5, 3)	# 往 broker 中添加一个任务 
print(ret) 

本文参考链接:https://www.cnblogs.com/XiaoYang-sir/p/15041412.html
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

搜索
关注我们

一个IT知识分享的公众号