Crontab
- 在INSTALLED_APPS中添加
django_crontab
- 在setting中配置(cron_jobs中时间怎么写?)
CRONJOBS = (
# 初级模式
('*/5 * * * *', 'myproject.myapp.cron.my_scheduled_job'),
# 中级模式
('*/1 * * * *', 'crontab_celery.cron_tasks.test_crontab', f'>> {os.path.join(BASE_DIR, "logs/crontab.log")}'),
('* * * * * sleep 10;', 'crontab_celery.cron_tasks.test_crontab', f'>> {os.path.join(BASE_DIR, "logs/crontab.log")}'),
# 定时执行命令
('*/1 * * * *', 'django.core.management.call_command', ['celery -A crontab_celery.main worker -l info'], f'>> {os.path.join(BASE_DIR, "logs/celery.log")}'),
# 高级模式
('0 0 * * 0', 'django.core.management.call_command', ['dumpdata', 'auth'], {'indent': 4}, '> /home/john/backups/last_sunday_auth_backup.json'),
)
- 在定时任务中出发celery异步任务
def test_crontab():
"""
定时任务:应该从celery的结果数据中,获取文件名列表,操作文件名列表
:return:
"""
# 这里返回的数据最后会输出到log日志中
from crontab_celery.file import tasks as test_task
test_task.file_calculate.delay(random.random())
Celery
自定义连接池、序列化方式、压缩方式、日志记录方式、任务调度、生产者、消费者、中间人(Broker)
组成
Broker
broker是一个消息传输的中间件
1、每当程序调用 celery 的异步任务的时候,会向 broker 传递消息,而后 celery 的 worker 将会取到消息
2、broker 的实现方案有 redis
、rabbitmq
、数据库
Backend
backend 是用来存储 celery 执行任务的当前状态和最终结果
Worker
具体执行代码的基础单元
1、负责从 rabbitmq 或者 redis 中拉取任务执行
2、可以在不同的主机上启动 worker 实现分布式执行
Beat
负责定时或者循环的向 redis 等 broker 中写入任务
以便让 worker 从 broker 中获取任务执行,两者配合实现 celery 定时任务
支持
中间人
- RabbitMQ
- Redis
- Amazon SQS
- 结果存储
结果存储
- AMQP、 Redis
- Memcached
- SQLAlchemy、Django ORM
- Apache Cassandra、Elasticsearch
并发
- prefork (multiprocessing)
- Eventlet、gevent
- solo (single threaded)
序列化
- pickle、json、yaml、msgpack
- zlib、bzip2 compression
- Cryptographic message signing
功能
监控
- 可以针对整个流程进行监控,内置的工具或可以实时说明当前集群的概况
调度
- 可以通过调度功能在一段时间内指定任务的执行时间 datetime
- 也可以根据简单每隔一段时间进行执行重复的任务,支持分钟、小时、星期几,也支持某一天/年的 Crontab 表达式
工作流
- 可以通过 “canvas“进行组成工作流,其中包含分组、链接、分块等等
- 简单和复杂的工作流程可以使用一组 “canvas“组成,其中包含分组、链接、分块等
内存泄漏保护
- max-tasks-per-child 参数适用于可能会出现资源泄漏(例如:内存泄漏)的任务
时间和速率的限制
- 您可以控制每秒 / 分钟 / 小时执行任务的次数
- 或者任务执行的最长时间,也将这些设置为默认值,针对特定的任务或程序进行定制化配置
自定义组件
- 开发者可以定制化每一个职程(Worker)以及额外的组件
- 职程(Worker)是用 “bootsteps” 构建的 - 一个依赖关系图,可以对职程(Worker)的内部进行细粒度控制
消息队列
- RabbitMQ
- Redis
安装
-
pip install celery
使用
如果是一个大项目,则创建一个应用模块
tasks.py
from celery import Celery
# 第一个参数Celery是当前模块的名称。这仅是必需的,以便在__main__模块中定义任务时可以自动生成名称。
# 第二个参数是 broker 关键字参数,指定要使用的消息代理的 URL。这里使用 RabbitMQ(也是默认选项)。
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
启动
# 查看可视化
celery -A crontab_celery.main flower
# 开启异步任务
celery -A crontab_celery.main worker -l info
# 关闭所有celery进程
ps auxww | grep 'celery' | awk '{print $2}' | xargs kill -9
# 输出到日志
celery -A crontab_celery.main worker -l info -B --logfile=/root/project/INFO/logs/celery.log
# 查看帮助
celery worker --help
celery --help
评论区