"/>
侧边栏壁纸
博主头像
PySuper 博主等级

千里之行,始于足下

  • 累计撰写 218 篇文章
  • 累计创建 15 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录
Web

Crontab + Celery 定时处理异步任务

PySuper
2021-04-13 / 0 评论 / 0 点赞 / 6 阅读 / 0 字
温馨提示:
本文最后更新于2024-05-28,若内容或图片失效,请留言反馈。 所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

Crontab

CRONJOBS = (
    # 初级模式
    ('*/5 * * * *', 'myproject.myapp.cron.my_scheduled_job'),

    # 中级模式
    ('*/1 * * * *', 'crontab_celery.cron_tasks.test_crontab', f'>> {os.path.join(BASE_DIR, "logs/crontab.log")}'),
    ('* * * * * sleep 10;', 'crontab_celery.cron_tasks.test_crontab', f'>> {os.path.join(BASE_DIR, "logs/crontab.log")}'),

    # 定时执行命令
    ('*/1 * * * *', 'django.core.management.call_command', ['celery -A crontab_celery.main worker -l info'], f'>> {os.path.join(BASE_DIR, "logs/celery.log")}'),

    # 高级模式
    ('0   0 * * 0', 'django.core.management.call_command', ['dumpdata', 'auth'], {'indent': 4}, '> /home/john/backups/last_sunday_auth_backup.json'),
)
  • 在定时任务中出发celery异步任务
def test_crontab():
    """
    定时任务:应该从celery的结果数据中,获取文件名列表,操作文件名列表
    :return:
    """
    # 这里返回的数据最后会输出到log日志中

    from crontab_celery.file import tasks as test_task
    test_task.file_calculate.delay(random.random())

Celery

自定义连接池、序列化方式、压缩方式、日志记录方式、任务调度、生产者、消费者、中间人(Broker)

组成

Broker

broker是一个消息传输的中间件

1、每当程序调用 celery 的异步任务的时候,会向 broker 传递消息,而后 celery 的 worker 将会取到消息

2、broker 的实现方案有 redisrabbitmq数据库

Backend

backend 是用来存储 celery 执行任务的当前状态和最终结果

Worker

具体执行代码的基础单元

1、负责从 rabbitmq 或者 redis 中拉取任务执行

2、可以在不同的主机上启动 worker 实现分布式执行

Beat

负责定时或者循环的向 redis 等 broker 中写入任务

以便让 worker 从 broker 中获取任务执行,两者配合实现 celery 定时任务

支持

中间人

  • RabbitMQ
  • Redis
  • Amazon SQS
  • 结果存储

结果存储

  • AMQP、 Redis
  • Memcached
  • SQLAlchemy、Django ORM
  • Apache Cassandra、Elasticsearch

并发

  • prefork (multiprocessing)
  • Eventlet、gevent
  • solo (single threaded)

序列化

  • pickle、json、yaml、msgpack
  • zlib、bzip2 compression
  • Cryptographic message signing

功能

监控

  • 可以针对整个流程进行监控,内置的工具或可以实时说明当前集群的概况

调度

  • 可以通过调度功能在一段时间内指定任务的执行时间 datetime
  • 也可以根据简单每隔一段时间进行执行重复的任务,支持分钟、小时、星期几,也支持某一天/年的 Crontab 表达式

工作流

  • 可以通过 “canvas“进行组成工作流,其中包含分组、链接、分块等等
  • 简单和复杂的工作流程可以使用一组 “canvas“组成,其中包含分组、链接、分块等

内存泄漏保护

  • max-tasks-per-child 参数适用于可能会出现资源泄漏(例如:内存泄漏)的任务

时间和速率的限制

  • 您可以控制每秒 / 分钟 / 小时执行任务的次数
  • 或者任务执行的最长时间,也将这些设置为默认值,针对特定的任务或程序进行定制化配置

自定义组件

  • 开发者可以定制化每一个职程(Worker)以及额外的组件
  • 职程(Worker)是用 “bootsteps” 构建的 - 一个依赖关系图,可以对职程(Worker)的内部进行细粒度控制

消息队列

  • RabbitMQ
  • Redis

安装

  • pip install celery
    

使用

如果是一个大项目,则创建一个应用模块

tasks.py

from celery import Celery

# 第一个参数Celery是当前模块的名称。这仅是必需的,以便在__main__模块中定义任务时可以自动生成名称。
# 第二个参数是 broker 关键字参数,指定要使用的消息代理的 URL。这里使用 RabbitMQ(也是默认选项)。
app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

启动

# 查看可视化
celery -A crontab_celery.main flower

# 开启异步任务
celery -A crontab_celery.main worker -l info

# 关闭所有celery进程
ps auxww | grep 'celery' | awk '{print $2}' | xargs kill -9

# 输出到日志
celery -A crontab_celery.main worker -l info -B --logfile=/root/project/INFO/logs/celery.log

# 查看帮助
celery worker --help
celery --help

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区