一,什么是celery
1.1celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等,使用redis作为队列用的更多吧,因为使用相比mq更加简单,后面实战介绍也是用的redis
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
另外, Celery还支持不同的并发和序列化的手段
- 并发:Prefork, Eventlet, gevent, threads/single threaded
- 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等
1.2使用场景
celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
比如系统中创建用户成功,用户信息修改后发送邮件通知,用户注册短信验证,执行耗时较长的任务时
定时任务:定时执行某件事情,比如每天数据统计
1.3celery优点
imple(简单)
Celery 使用和维护都非常简单,并且不需要配置文件。
Highly Available(高可用)
woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。
Fast(快速)
单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)
Flexible(灵活)
Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。
1.4celery安装
可以通过python包管理平台pypi安装celery
pip install celery
二,celery执行异步任务
实验前提我在10.0.0.55这台内网机上安装了redis服务并且开通了6379端口
创建一个py文件,文件名为celery_task.py,内容如下:
import celery
import time
backend = 'redis://10.0.0.55:6379/1'
broker = 'redis://10.0.0.55:6379/2'
cel = celery.Celery('test', backend=backend, broker=broker)
@cel.task
def send_email(name):
print("向%s发送邮件..." % name)
time.sleep(5)
print("向%s发送邮件完成" % name)
return "ok"
编写完成后先别直接运行,先启动celery worker命令为:
celery worker -A celery_task -l info
celery命令,worker开的进程 -A 后面接的是要执行的文件名 -l 指定日志级别,执行后出现:
代表启动成功
接下来就是生产者创建任务推送到消息中间件,创建执行任务文件produce_task.py:
from celery_task import send_email
result2 = send_email.delay("alex")
print(result2.id)
生产者调用delay方法会向消息中间件插入队列信息,信息包括你要执行的send_email函数还有alex参数
点击执行produce_task.py,会发现虽然send_email函数里有5秒钟的挂起,但是执行很快就给返回了
但是这里碰到一个坑,worker进程接收到了执行任务,但是报错,说send_email函数期待接收3个参数,但是我一个都没给,报错如下:
解决办法:windows python虚拟环境下安装eventlet
重新起celery worker 进程监听
celery worker -A celery_task -l info -P eventlet
说是celery 4+开始已经对windows不再支持了,所以我们在windows上实验的话,得安装eventlet库,eventlet是一个python协程模板,重新执行produce_task.py
celery worker收到任务完成执行
我们执行完produce_task.py发现返回得是一个类似id的结果,这个就是celery id,那我如何取到send_email函数里return的结果呢?
创建result.py
from celery.result import AsyncResult # 引入AsyncResult
from celery_task import cel #导入celery app实例
async_result=AsyncResult(id="910dc103-e4e2-48a8-8fc1-1f631d9323b1", app=cel)
# 910dc103-e4e2-48a8-8fc1-1f631d9323b1就是异步返回的执行任务id
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')
result里对async_result任务状态进行判断,这里简单说下celery 任务的几种状态:
- PENDING (waiting for execution or unknown task id)
- STARTED (task has been started)
- SUCCESS (task executed successfully)
- FAILURE (task execution resulted in exception)
- RETRY (task is being retried)
- REVOKED (task has been revoked)
执行后返回:
得到return ok返回值
以下是项目中实际编写的获取celery result的一个示例:
def cel_result(task_info, webhook, timeout=60):
"""
:param timeout: 超时时间
:param task_info: 列表,元素为字典,字典包含task_id host_info update_method
:param webhook:
:return:
"""
done = 0 # 操作完成计数
count = 0 # 循环等待次数
result = {} # 最终结果
success = True # 是否全部成功
print("任务执行检查,超时时间为:", timeout)
while True: #
print("第{}次查询".format(str(count)))
for task in task_info:
if str(task['task_id']) in result: # 已出结果的任务 pass
continue
update_method = task['update_method']
host_name = task['host_info']['host_name']
host_ip = task['host_info']['host_ip']
async_result = AsyncResult(id=task['task_id'], app=c)
print("task_id", task['task_id'], "状态", async_result.status)
if async_result.successful():
msg = async_result.get()
print(msg)
if not msg['success']: # 执行成功 但是结果失败,认为失败
success = False
result[str(task['task_id'])] = msg
# async_result.forget() # 将结果删除
done += 1
progress = '已完成 {}/{} {}'.format(str(done), str(len(task_info)), str(msg))
webhook_msg(webhook, progress)
print(progress)
elif async_result.failed():
msg = "{} {} {} 操作失败".format(host_name, host_ip, update_method)
result[str(task['task_id'])] = msg
done += 1
progress = '已完成 {}/{} {}'.format(str(done), str(len(task_info)), msg)
webhook_msg(webhook, progress)
print(progress)
success = False
# elif async_result.status == 'PENDING': # 有问题,执行完了也是pending状态
# result[str(task['task_id'])] = '任务等待中被执行'
# elif async_result.status == 'RETRY':
# result[str(task['task_id'])] = '任务异常后正在重试'
# elif async_result.status == 'STARTED':
# result[str(task['task_id'])] = '任务已经开始被执行'
if done >= len(task_info):
return result, success
else: # 等待1
time.sleep(1)
count += 1
if count >= timeout: # 超时处理
print("timeout超时")
return result, False
多任务结构:
celery_setting.py celery配置文件,最终返回celery实例,这里取名为celery_setting而不是celery的文件名是为了反正产生循环导入的问题
from celery import Celery
cel = Celery('celery_demo',
broker='redis://10.0.0.55:6379/1',
backend='redis://10.0.0.55:6379/2',
# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
include=['celery_tasks.task01',
'celery_tasks.task02'
])
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
celery_demo,celery app 实例名字
borker,中间件代理地址
backend,异步任务结果存储代理地址
include,寻找异步任务模板路径,如果有多个异步任务,就写多个进去,列表包起来
task01.py,发送邮件任务
import time
from celery_tasks.celery_setting import cel
@cel.task
def send_email(res):
time.sleep(5)
return "完成向%s发送邮件任务" % res
task02.py,发送短信任务
import time
from celery_tasks.celery_setting import cel
@cel.task
def send_msg(name):
time.sleep(5)
return "完成向%s发送短信任务" % name
开启work进程
celery worker -A celery_tasks.celery_setting -l info -P eventlet
发现已经扫描到我这两个异步任务
编写生产者produce_task.py
from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg
# 立即告知celery去执行test_celery任务,并传入一个参数
result = send_email.delay('peng')
print(result.id)
result = send_msg.delay('peng')
print(result.id)
Celery定时任务
produce_task.py
from celery_task import send_email
from datetime import datetime
# 方式一
v1 = datetime(2023, 5, 7, 21, 57, 00)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
result = send_email.apply_async(args=["egon",], eta=v1)
print(result.id)
# 方式二
# ctime = datetime.now()
# # 默认用utc时间
# utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
# from datetime import timedelta
# time_delay = timedelta(seconds=10)
# task_time = utc_ctime + time_delay
#
# # 使用apply_async并设定时间
# result = send_email.apply_async(args=["egon"], eta=task_time)
# print(result.id)
通过apply_async来执行定时任务,args里面是向send_email函数传递的参数,eta是传递的国标日期时间
多层次目录下我们修改celery_setting.py
from datetime import timedelta
from celery import Celery
cel = Celery('celery_demo',
broker='redis://10.0.0.55:6379/1',
backend='redis://10.0.0.55:6379/2',
# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
include=['celery_tasks.task01',
'celery_tasks.task02'
])
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
cel.conf.beat_schedule = {
# 名字随意命名
'add-every-10-seconds': {
# 执行tasks1下的test_celery函数
'task': 'celery_tasks.task01.send_email',
# 每隔2秒执行一次
# 'schedule': 1.0,
# 'schedule': crontab(minute="*/1"),
'schedule': timedelta(seconds=6),
# 传递参数
'args': ('张三',)
},
}
这里cel celery实例增加了beat_schedule选项
task:执行定时任务模块路径
schedule:日程,上面意思是每隔6秒钟
args:传递给定时任务函数的参数
启动定时器
celery beat -A celery_tasks.celery_setting
每6秒钟往队列里插入任务
worker消费端检测到任务执行
Django中使用celery示例:
项目中celery配置文件:
autodiscover_tasks自动发现任务,settings.INSTALL_APPS,表示会扫描所有settings配置的项目目录下的tasks文件,app.config_from_object表示会读取settings文件中关于celery的一些配置
from .celery import app as celery_app
导入了名为app
的Celery实例对象,并将其命名为celery_app
,这样其他模块可以通过导入这个模块来使用该Celery实例对象。
__all__ = ('celery_app',)
定义了__all__
变量,它是一个字符串列表,表示模块中可以被导出的变量、类和函数。这里只导出了celery_app
,也就是说只有这个变量可以被其他模块导入。
同样我们通过在服务器上启动celery woker进程作为消费端
celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
同时引入celery的task作为装饰器去修饰我们要执行的celery任务函数
@task(name=xxx) name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
生产端我们在view视图里去通过delay实现异步任务调用
flask使用celery
celery配置文件示例:
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
from config import configs
from getenv import getenv
env = getenv()
def init_celery():
broker_url = configs[env].BROKER_URL
result_backend = configs[env].CELERY_RESULT_BACKEND
include = configs[env].CELERY_INCLUDE # CELERY_INCLUDE = ['celery_tasks.update.tasks']
timezone = configs[env].CELERY_TIMEZONE
beat_schedule = {
'purchse_ecs_check': {
'task': 'celery_tasks.cron_task.purchse_ecs_check',
'schedule': crontab(minute='*/20')
},
# 'check_dcdn_used': {
# 'task': 'celery_tasks.check_dcnd_used_task.check_dcdn_used_task',
# 'schedule': crontab(hour='*/1')
# },
'save_onlin_data': {
'task': 'celery_tasks.cron_task.save_online',
'schedule': crontab(minute='*/1')
},
'serial_task_check': {
'task': 'celery_tasks.cron_task.serial_check',
'schedule': crontab(minute='*/1')
},
# 'test_webhook': {
# 'task': 'celery_tasks.webhooktest.webhooktest',
# 'schedule': crontab(minute="*")
# }
}
cel = Celery('crontab', backend=result_backend, broker=broker_url, include=include) # 创建 Celery 实例
cel.conf.timezone = timezone
cel.conf.enable_utc = False
cel.conf.beat_schedule = beat_schedule
# class ContextTask(c.Task):
# def __call__(self, *args, **kwargs):
# with app.app_context():
# return self.run(*args, **kwargs)
#
# cel.Task = ContextTask
return cel
# flask_app = init_app()
c = init_celery()
include异步任务模块路径发现,我们配置在config里
celery beat和worker进程我们通过supervisor进行管控,supervisor配置文件
beat的
[include]
files=/etc/supervisord.conf
[program:celery_beat]
user=hero
directory=/home/hero/heroes_assemble
# command=/home/hero/heroes_assemble/venv/bin/celery -A celery_tasks.manager.c beat -l info
command=/home/hero/heroes_assemble/venv/bin/celery -A celery_tasks.celery.c beat -l info --logfile=/home/hero/log/celery_beat.log
worker的
[include]
files=/etc/supervisord.conf
[program:celery_worker]
environment=HOME="/home/hero",ANSIBLE_FORCE_COLOR=True
user=hero
directory=/home/hero/heroes_assemble
# command=/home/hero/heroes_assemble/venv/bin/celery -A celery_tasks.manager.c worker -l info
command=/home/hero/heroes_assemble/venv/bin/celery -A celery_tasks.celery.c worker -l info --concurrency=4 --logfile=/home/hero/log/celery_work.log
这个–concurrency等于就是一次并发执行4个celery任务
通过引入celery里的c 实例来修饰你需要作为celery任务的函数
生产端通过delay发起异步任务调用
定时任务也是通过引用celery里c 实例来修饰你需要作为celery任务的函数,只是最后配置 beat_schedule需要将这个所修饰的函数名模块路径带上
服务器租用托管,机房租用托管,主机租用托管,https://www.e1idc.com