检查 Celery 版本:pip show celery
Celery 异步任务
Crontab 定时任务
启动 Celery Worker:
celery -A your_project worker -l info启动 Celery Beat:
celery -A your_project beat -l info
在现代 Web 应用的开发中,任务调度是一个常见的需求。Celery 和 Crontab 是两种流行的任务调度解决方案,它们各有特点和适用场景。以下是对这两种技术的比较和分析,帮助开发者根据具体需求选择合适的工具。
Celery VS Crontab
1、Celery
Celery 是一个基于分布式消息传递的强大异步任务队列/作业队列
它专门用于处理异步任务和定时任务,非常适合与现代 web 应用框架(如 Django)集成
优点:
功能强大:支持异步任务、定时任务以及任务结果存储等
灵活性:可以设置复杂的任务调度逻辑,如任务链、任务组合和任务重试等
可扩展性:支持多种消息代理(如 RabbitMQ、Redis),易于扩展到多服务器或高可用配置
集成简单:与 Django 等框架集成良好,通过 Django-celery-easy实现任务管理和监控
错误处理:提供任务失败重试、错误回调等机制
监控和管理:通过 Flower 等工具可以实时监控任务状态,管理任务执行
缺点:
复杂性:相比 Crontab,Celery 的设置和运维更为复杂。
资源消耗:运行 Celery 需要额外的资源,对于小规模应用可能是一个负担
2、Crontab
Crontab 是 Unix/Linux 系统中的一个程序,用于按照预定的时间周期执行任务
它是系统级的服务,适用于简单的周期性任务调度
优点:
简单直接:直接在操作系统层面设置,不需要额外的程序
稳定性:作为 Unix/Linux 系统的一部分,稳定性很高
缺点:
环境限制:只能在 Unix 或 Linux 系统上使用
功能有限:仅支持时间触发,不支持复杂的逻辑或依赖关系
难以集成:与现代应用程序(如 Django)集成不够灵活,需要额外的脚本支持
错误处理和监控:不提供内置的错误处理或任务执行监控功能
3、 选择
只使用 Celery:适用于需要处理大量的后台任务,包括异步任务和周期性任务的项目。Celery 提供了良好的支持和扩展性,特别适合需要与现代Web框架集成的应用。
只使用 Crontab:适用于只需定时执行一些简单的脚本或命令的场景,这些任务不需要与应用紧密集成。
结合使用:在某些特定场景下,如需要在非常精确的时间点触发任务,可以使用 Crontab 触发一个脚本,该脚本再调用 Celery 任务。这样可以结合 Crontab 的时间控制能力和 Celery 的任务处理能力
4、 总结
在大多数现代 Web 应用中,只使用 Celery 就足够了,因为它提供了包括定时任务在内的广泛功能,同时还支持异步任务处理、任务监控等高级功能
只有在特定的需求或特殊的系统配置下,才考虑将 Crontab 与 Celery 结合使用
在决定之前,评估你的项目需求、资源和维护能力是非常重要的
celery-beat和Crontab 哪个更好用
在选择 Celery Beat 和 Crontab 用于任务调度时,最佳选择取决于您的具体需求、系统环境以及与现有技术栈的兼容性。以下是两者的比较,帮助您做出决策:
Celery Beat
优点:
集成性:如果您已经在使用 Celery 处理后台任务,Celery Beat 可以无缝集成,无需引入新的技术或工具。
灵活性:Celery Beat 支持多种调度策略,包括定时(timedelta)和定点(crontab)调度。这使得任务调度更加灵活。
分布式:Celery 支持分布式环境,可以在多个节点上运行 worker 和 scheduler,增强了系统的可扩展性和容错能力。
状态监控:与 Celery 集成意味着可以利用 Celery 的监控工具(如 Flower)来监控任务的状态和性能。
缺点:
复杂性:相比 Crontab,Celery Beat 的设置和配置更复杂,需要更多的学习和维护成本。
资源消耗:Celery Beat 运行为一个长期进程,可能会消耗更多的系统资源。
Crontab
优点:
简单性:Crontab 是 Unix/Linux 系统中的标准组件,使用广泛,配置简单直观。
轻量级:Crontab 不需要运行额外的后台进程,资源消耗相对较低。
稳定性:作为长期存在的系统工具,Crontab 非常稳定可靠。
缺点:
平台依赖性:Crontab 主要用于 Unix 和 Linux 系统,不适用于 Windows 系统,这限制了其跨平台的应用。
功能限制:Crontab 的功能相对基础,主要限于时间调度,不支持任务执行的复杂逻辑控制。
缺乏集成:如果需要与 Python 应用或其他后台任务处理器集成,Crontab 可能不如 Celery Beat 那样方便。
总结
如果您的应用已经在使用 Celery,并且需要一个支持复杂逻辑、可监控状态、可在分布式系统中运行的调度器,Celery Beat 是一个更好的选择。
如果您需要一个简单、轻量级的解决方案,只需在特定时间执行脚本或命令,且主要在 Unix/Linux 环境下工作,Crontab 可能更适合您的需求。
最终选择应基于具体需求、技术栈以及对系统资源的考虑。
深入 Celery
1、 核心组件
Celery Worker
功能:执行实际的任务。Worker 是一个运行中的进程,它监听任务队列,获取任务并执行它们。
执行流程:Worker 从消息队列中取出任务,执行任务逻辑,然后将结果存储在结果后端或直接丢弃,取决于配置。
Broker(消息代理)
功能:接收来自客户端的任务消息,并将它们转发给 Worker。常用的 Broker 包括 RabbitMQ、Redis、Amazon SQS 等。
执行流程:Broker 管理着任务队列,保证任务消息的传递和存储,直到被 Worker 消费。
Backend(结果后端)
功能:存储任务的状态和结果。后端可以是任何支持存储键值对的系统,如 Redis、数据库、Amazon S3 等。
执行流程:任务执行完毕后,结果被发送到配置的结果后端,供发起任务的客户端查询。
Celery Client
功能:发起任务。Client 可以是任何通过 Celery 库发起任务的应用。
执行流程:Client 发起一个任务,这个任务被发送到 Broker,然后由 Worker 消费和执行。
Celery Beat
功能:定时任务调度器。Beat 用于定时发送任务消息到 Broker。
执行流程:根据预设的调度计划(如每天凌晨执行),Beat 定时将任务发送到 Broker,然后由 Worker 执行。
2、 执行流程
任务发布:
开发者定义任务函数
Celery Client 创建任务消息,并将其发送到 Broker
任务调度:
Broker 接收到任务消息,并将其存储在队列中
Celery Worker 监听队列,从 Broker 获取任务消息
任务执行:
Worker 取出任务,执行任务逻辑
执行结果可以选择存储在 Backend 中,或者直接丢弃
结果查询:
发起任务的 Client 可以查询 Backend,获取任务执行结果
常用函数的用法
1、 基本任务定义
使用 @app.task 装饰器定义一个简单的任务:
@app.task
def add(x, y):
return x + y2、 延迟执行任务
delay():这是异步执行任务的最常用方法,返回一个
AsyncResult实例,用于查询任务状态或结果。
result = add.delay(4, 6)apply_async():类似于
delay(),但提供更多控制选项,如设置任务的执行时间、优先级等。
result = add.apply_async((4, 6), countdown=10) # 10秒后执行3、 获取任务结果
get():从
AsyncResult对象中获取任务结果。如果任务未完成,此调用将阻塞直到任务完成。
print(result.get(timeout=10)) # 等待最多10秒4、 任务重试
retry():在任务函数中使用,用于在发生异常时重新尝试执行任务。
@app.task(bind=True)
def update_record(self, record_id):
try:
# 尝试更新记录
except SomeException as exc:
self.retry(exc=exc, countdown=5, max_retries=3)5、 访问任务上下文
bind=True:在定义任务时使用此选项,使任务函数能够访问任务的上下文(
self)。
@app.task(bind=True)
def my_task(self, x, y):
print(self.request.id) # 访问当前任务的请求ID6、 链接和组合任务
chain():用于将多个任务链接在一起,这些任务将按顺序执行。
from celery import chain
chain(add.s(2, 2), add.s(4)).apply_async()group():用于并行执行一组任务。
from celery import group
group(add.s(i, i) for i in range(10)).apply_async()chord():允许在一组任务全部完成后执行一个回调任务。
from celery import chord
chord((add.s(i, i) for i in range(10)), add.s()).apply_async()这些方法和功能使 Celery 成为处理各种复杂后台任务和调度需求的强大工具。
Django 集成
在 Django 中集成 Celery 主要涉及几个步骤:安装必要的包、配置 Celery、创建任务以及启动 worker 进程。下面是详细的步骤和示例:
1、安装 Celery 和消息代理
首先,您需要安装 Celery。Celery 支持多种消息代理,如 RabbitMQ、Redis 等。这里以 Redis 为例:
pip install celery redis确保您也安装了 Redis,并且 Redis 服务正在运行。
2、 配置 Celery
在您的 Django 项目中,创建一个新的 Python 模块来配置 Celery。通常,这个模块被命名为 celery.py。在 Django 项目的同级目录下创建这个文件,例如在 your_project/your_project/ 目录下:
# your_project/your_project/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()在、 settings.py 中添加 Celery 配置:
# your_project/your_project/settings.py
# Celery Configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# 更多配置
# Celery
# ------------------------------------------------------------------------------
# 如果启用时区设置
if USE_TZ:
# 设置Celery使用的时区与项目时区一致
CELERY_TIMEZONE = TIME_ZONE
# Celery应用名称
CELERY_APP_NAME = "affect"
# 设置存储Celery任务队列的Redis数据库
# 设置存储任务结果的数据仓库
# CELERY_RESULT_BACKEND = "django-db"
# 禁用Django Celery Beat的时区感知
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 启用任务错误传播
CELERY_TASK_EAGER_PROPAGATES = True
# 禁止Celery劫持根日志记录器
CELERYD_HIJACK_ROOT_LOGGER = False
# Celery消息代理URL
# CELERY_BROKER_URL = env("CELERY_BROKER_URL")
# CELERY_BROKER_URL = f"redis://:{REDIS_PASSWD}@{REDIS_HOST}:{REDIS_PORT}/{BROKER_DB}"
CELERY_BROKER_URL = "redis://:Affect_PySuper@1.13.0.17:16379/13"
# Celery结果后端设置
# CELERY_RESULT_BACKEND = f"redis://:{REDIS_PASSWD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
CELERY_RESULT_BACKEND = "redis://:Affect_PySuper@1.13.0.17:16379/14"
print(CELERY_BROKER_URL)
print(CELERY_RESULT_BACKEND)
# 指定接受的内容类型
CELERY_ACCEPT_CONTENT = ["json"]
# 任务序列化器
CELERY_TASK_SERIALIZER = "json"
# 结果序列化器
CELERY_RESULT_SERIALIZER = "json"
# 任务执行时间限制(5分钟)
CELERY_TASK_TIME_LIMIT = 5 * 60
# 任务软时间限制(1分钟)
CELERY_TASK_SOFT_TIME_LIMIT = 60
# 使用数据库作为任务调度器
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
# 定时任务配置
CELERY_ENABLE_UTC = False
CELERY_BEAT_SCHEDULE = {
# 检查任务完成情况的定时任务
"check-task-completion": {
# 任务路径
"task": "apps.common.tasks.check_task_completion",
# 每天凌晨1点执行
"schedule": crontab(hour=1, minute=0),
},
}
# 指定导入的任务模块,可以指定多个
CELERY_IMPORTS = (
"apps.task.tasks",
"apps.order.tasks",
"apps.settle.tasks",
"apps.wechat.tasks",
"apps.oms.tasks",
"apps.track.tasks",
)
# 定义Celery任务队列配置
CELERY_TASK_QUEUES = {
Queue("high_priority", Exchange("high_priority"), routing_key="high_priority"),
Queue("default", Exchange("default"), routing_key="default"),
Queue("low_priority", Exchange("low_priority"), routing_key="low_priority"),
}
# 设置默认队列名称
CELERY_TASK_DEFAULT_QUEUE = "default"
# 设置默认交换机名称
CELERY_TASK_DEFAULT_EXCHANGE = "default"
# 设置默认路由键
CELERY_TASK_DEFAULT_ROUTING_KEY = "default"
# 设置任务最大重试次数
CELERY_TASK_MAX_RETRIES = 3
# 设置任务重试延迟时间(5分钟)
CELERY_TASK_DEFAULT_RETRY_DELAY = 5 * 60 # 5 minutes
# 是否立即执行任务(测试用)
CELERY_TASK_ALWAYS_EAGER = False
# 是否禁用工作进程的速率限制
CELERY_WORKER_DISABLE_RATE_LIMITS = True
# 设置工作进程的并发数
CELERY_WORKER_CONCURRENCY = 4
# 设置工作进程的预取倍数
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
# 设置工作进程的日志格式
CELERY_WORKER_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"
# 设置工作进程的任务日志格式
CELERY_WORKER_TASK_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s"
# 设置标准输出重定向的日志级别
CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO"
# 设置结果后端的数据库连接
# CELERY_RESULT_BACKEND = "db+pool://..."3、 创建 Celery 任务
在任何一个 Django app 下创建任务。例如,在 app/tasks.py 中定义一个简单的任务:
# your_project/app/tasks.py
from your_project.celery import app
@app.task
def add(x, y):
return x + y4、 启动 Celery Worker
在您的 Django 项目目录下,运行以下命令来启动 Celery worker:
celery -A your_project worker --loglevel=info这条命令会启动一个 Celery worker,它会监听并执行定义的任务。
5、 调用任务
您可以在 Django 视图或模型中调用这些任务。例如,在视图中:
# your_project/app/views.py
from django.http import HttpResponse
from .tasks import add
def calculate(request):
result = add.delay(4, 5)
return HttpResponse(f"Task initiated, result will be {result.get(timeout=10)}")6、注意事项
确保 Celery 配置与您的消息代理设置相匹配。
使用
delay()方法异步执行任务。使用
get()方法谨慎,因为它会阻塞请求直到任务完成。在生产环境中,通常不直接在请求响应中等待任务结果。
定时任务
在 Celery 中实现定时任务通常涉及使用 Celery 的定时任务功能,这是通过设置周期性任务(periodic tasks)来完成的。Celery 使用名为 Celery Beat 的调度器来运行定时任务。以下是如何设置和使用 Celery Beat 来运行周期性任务的步骤:
1、 安装 Celery 和消息代理
确保您已经安装了 Celery 和一个消息代理(如 Redis 或 RabbitMQ)。如果还没有安装,可以使用以下命令安装 Celery 和 Redis:
pip install celery redis2、 配置 Celery
在您的项目中配置 Celery,如在一个 Python 文件中(例如 celery.py):
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.timezone = 'UTC'3、 创建周期性任务
您可以定义一些周期性执行的任务。例如,创建一个简单的任务来打印当前时间:
from datetime import datetime
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_time():
print("Current time:", datetime.utcnow().isoformat())4、 设置 Celery Beat 调度
您需要设置 Celery Beat 的调度来指定任务执行的时间。这可以通过在 Celery 的配置中设置 beat_schedule 来完成:
from datetime import timedelta
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.beat_schedule = {
'print-time-every-30-seconds': {
'task': 'your_module.print_time',
'schedule': timedelta(seconds=30),
},
}在这个例子中,print_time 任务将每 30 秒执行一次。
5、 启动 Celery Worker 和 Celery Beat
要运行定时任务,您需要同时启动 Celery Worker 和 Celery Beat。可以在命令行中使用以下命令启动它们:
# 启动 Worker
celery -A your_module worker --loglevel=info
# 在另一个终端或后台任务中启动 Beat
celery -A your_module beat --loglevel=info6、 可选:使用 Celery Beat 的持久化
默认情况下,Celery Beat 的调度是在内存中的,这意味着所有调度信息在重启后会丢失。为了持久化调度信息,您可以配置一个定时任务数据库:
app.conf.beat_schedule_filename = '/path/to/your/schedule.db'这将使 Celery Beat 使用指定的数据库文件来存储调度信息,从而在重启后保持调度状态。
拓展
自定义模板
from celery import Task
from celery.utils.log import get_task_logger
class BaseTask(Task):
# 设置重试次数和重试间隔
max_retries = 3
default_retry_delay = 5 # 5秒
def __init__(self):
self.logger = get_task_logger(__name__)
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""任务失败时执行的操作"""
self.logger.error(f'Task {self.name} [{task_id}] raised error: {exc!r}. Traceback: {einfo}')
def on_success(self, retval, task_id, args, kwargs):
"""任务成功完成时执行的操作"""
self.logger.info(f'Task {self.name} [{task_id}] completed successfully')
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""任务重试时执行的操作"""
self.logger.warning(f'Task {self.name} [{task_id}] will be retried: {exc!r}')from celery import Celery
from your_module.base_task import BaseTask
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(base=BaseTask)
def add(x, y):
return x + ycelery -A your_module worker --loglevel=info高可用
在 Celery 中实现高可用(High Availability, HA)主要涉及确保消息代理和 Celery worker 都能在出现故障时继续运行或快速恢复。这通常需要配置冗余、负载均衡和故障转移机制。以下是一些关键的策略和步骤:
1. 选择支持高可用的消息代理
Celery 依赖于消息代理来传递消息。选择一个支持高可用配置的消息代理是非常重要的。常用的消息代理如 RabbitMQ 和 Redis 都支持高可用配置。
RabbitMQ:可以通过设置 RabbitMQ 集群来实现高可用。集群中的节点可以复制队列,确保在一个节点失败时,其他节点可以接管任务。
Redis:可以使用 Redis 的 Sentinel 系统来监控 Redis 实例的健康状况,并自动进行故障转移。此外,Redis 集群提供了数据分片和自动分区,增强了数据的可用性和分布。
2. 配置 Celery Worker 的冗余
部署多个 Celery worker 实例可以提高系统的冗余度和容错能力。如果一个 worker 实例失败,其他实例可以继续处理任务。这可以通过简单地在多个服务器或容器中运行相同的 Celery worker 命令来实现。
3. 使用心跳和监控来检测故障
配置 Celery 和消息代理的心跳功能,以便能够检测到故障和不活跃的节点。此外,使用监控工具(如 Flower、Prometheus 或 Grafana)来实时监控 Celery 的健康状况和性能。
4. 自动扩展和负载均衡
自动扩展:根据工作负载自动增加或减少 worker 数量。这可以通过 Kubernetes 或其他容器编排工具来实现。
负载均衡:确保任务在多个 worker 之间均匀分配。在 Celery 中,可以通过正确配置任务路由和队列来实现。
5. 故障恢复策略
任务持久化:配置 Celery 使得任务消息被持久化到磁盘,而不是仅存储在内存中。这样在进程或服务器崩溃后,未完成的任务可以被恢复和重新执行。
自动重试机制:为任务配置自动重试,特别是在遇到可预见的临时错误时(如网络问题或依赖服务的暂时不可用)。
6. 数据备份和恢复
定期备份关键数据,包括任务结果和应用状态。确保在发生数据丢失时,可以从备份中恢复。
7. 文档和流程
确保有详细的文档和流程来处理故障情况,包括如何快速恢复服务和如何手动干预。
通过实施这些策略,您可以为您的 Celery 环境构建一个高可用的系统,从而提高系统的稳定性和可靠性,减少因单点故障导致的服务中断。
@shared_task 和 @app.task
在 Celery 中,@shared_task 和 @app.task 是两种常用的方式来定义任务,它们各有特点和适用场景:
1、@app.task
这是最常见的方式来定义 Celery 任务。它将任务绑定到特定的 Celery 应用实例。这意味着,当你使用 @app.task 装饰器时,你需要先有一个 Celery 应用实例(通常是一个 Celery() 对象),然后使用这个实例来创建任务。
示例:
from celery import Celery
app = Celery('my_app', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y优点:
明确指定了哪个 Celery 应用配置用于该任务。
适用于大多数情况,特别是当你的项目中只有一个 Celery 应用时。
缺点:
如果你的项目中需要多个 Celery 应用实例,或者你想在多个应用中重用同一个任务,这种方式可能不够灵活。
2、@shared_task
@shared_task 装饰器用于创建一个可以在多个应用中共享的任务。这种方式不需要预先定义一个 Celery 应用实例,而是在运行时动态地附加到当前可用的 Celery 应用上。
示例:
from celery import shared_task
@shared_task
def multiply(x, y):
return x * y优点:
提供了更高的灵活性,允许在不同的项目或应用中重用同一个任务。
适合于库和框架开发,开发者可以定义任务而不必关心最终用户将使用什么 Celery 应用配置。
缺点:
由于它不绑定到特定的 Celery 应用,某些应用特定的配置可能不会自动应用到这些任务上,除非在任务执行时已经有一个 Celery 应用实例被创建和配置。
3、 总结
选择使用 @app.task 还是 @shared_task 主要取决于你的项目结构和需求:
如果你的项目结构简单,只需要一个 Celery 应用,或者你需要对任务进行特定的应用配置,使用
@app.task是合适的。如果你需要在多个项目或应用之间共享任务,或者你正在开发可重用的库,那么
@shared_task提供了更好的灵活性和便利性。
启动命令
这里是对您提供的 Celery 命令行中每个参数的含义的解释:
Celery Beat 命令
celery beat --workdir=/home/celery/affect -A affect -l info -S django -f /celery_log/celery_beat.out--workdir=/home/celery/affect: 设置工作目录,Celery 将在此目录下运行。-A affect或--app=affect: 指定 Celery 应用的名称。-l info或--loglevel=info: 设置日志级别为信息级别,这意味着将记录一般的操作信息。-S django或--scheduler=django: 使用 Django 的数据库调度器,这允许 Celery 使用 Django 数据库来存储定时任务的信息。-f /celery_log/celery_beat.out或--logfile=/celery_log/celery_beat.out: 指定日志文件的路径,Celery beat 的日志将被写入这个文件。
Celery Worker 命令
celery worker --workdir=/home/celery/affect -A affect -l debug -P threads -c 10 -f /celery_log/celery_worker.log--workdir=/home/celery/affect: 设置工作目录,与 beat 命令相同。-A affect或--app=affect: 指定 Celery 应用的名称,与 beat 命令相同。-l debug或--loglevel=debug: 设置日志级别为调试级别,这将记录更详细的信息,帮助开发者调试问题。-P threads或--pool=threads: 使用线程池来执行任务,这是处理 I/O 密集型任务的一种常见方式。-c 10或--concurrency=10: 设置并发的 worker 数量为 10,这意味着可以同时执行 10 个任务。-f /celery_log/celery_worker.log或--logfile=/celery_log/celery_worker.log: 指定日志文件的路径,与 beat 命令相同,但是文件名不同,用于记录 worker 的日志。
这些参数配置了 Celery 的运行环境、日志记录方式以及任务执行的并发处理方式。
评论区