目 录CONTENT

文章目录

Celery -- 异步任务队列

PySuper
2024-12-17 / 0 评论 / 0 点赞 / 5 阅读 / 0 字
温馨提示:
所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

检查 Celery 版本:pip show celery

Celery 异步任务

Crontab 定时任务

启动 Celery Worker:celery -A your_project worker -l info

启动 Celery Beat:celery -A your_project beat -l info

在现代 Web 应用的开发中,任务调度是一个常见的需求。Celery 和 Crontab 是两种流行的任务调度解决方案,它们各有特点和适用场景。以下是对这两种技术的比较和分析,帮助开发者根据具体需求选择合适的工具。

Celery VS Crontab

1、Celery

Celery 是一个基于分布式消息传递的强大异步任务队列/作业队列

它专门用于处理异步任务和定时任务,非常适合与现代 web 应用框架(如 Django)集成

优点:

  • 功能强大:支持异步任务、定时任务以及任务结果存储等

  • 灵活性:可以设置复杂的任务调度逻辑,如任务链、任务组合和任务重试等

  • 可扩展性:支持多种消息代理(如 RabbitMQ、Redis),易于扩展到多服务器或高可用配置

  • 集成简单:与 Django 等框架集成良好,通过 Django-celery-easy实现任务管理和监控

  • 错误处理:提供任务失败重试、错误回调等机制

  • 监控和管理:通过 Flower 等工具可以实时监控任务状态,管理任务执行

缺点:

  • 复杂性:相比 Crontab,Celery 的设置和运维更为复杂。

  • 资源消耗:运行 Celery 需要额外的资源,对于小规模应用可能是一个负担

2、Crontab

Crontab 是 Unix/Linux 系统中的一个程序,用于按照预定的时间周期执行任务

它是系统级的服务,适用于简单的周期性任务调度

优点:

  • 简单直接:直接在操作系统层面设置,不需要额外的程序

  • 稳定性:作为 Unix/Linux 系统的一部分,稳定性很高

缺点:

  • 环境限制:只能在 Unix 或 Linux 系统上使用

  • 功能有限:仅支持时间触发,不支持复杂的逻辑或依赖关系

  • 难以集成:与现代应用程序(如 Django)集成不够灵活,需要额外的脚本支持

  • 错误处理和监控:不提供内置的错误处理或任务执行监控功能

3、 选择

  • 只使用 Celery:适用于需要处理大量的后台任务,包括异步任务和周期性任务的项目。Celery 提供了良好的支持和扩展性,特别适合需要与现代Web框架集成的应用。

  • 只使用 Crontab:适用于只需定时执行一些简单的脚本或命令的场景,这些任务不需要与应用紧密集成。

  • 结合使用:在某些特定场景下,如需要在非常精确的时间点触发任务,可以使用 Crontab 触发一个脚本,该脚本再调用 Celery 任务。这样可以结合 Crontab 的时间控制能力和 Celery 的任务处理能力

4、 总结

  • 在大多数现代 Web 应用中,只使用 Celery 就足够了,因为它提供了包括定时任务在内的广泛功能,同时还支持异步任务处理、任务监控等高级功能

  • 只有在特定的需求或特殊的系统配置下,才考虑将 Crontab 与 Celery 结合使用

  • 在决定之前,评估你的项目需求、资源和维护能力是非常重要的

celery-beat和Crontab 哪个更好用

在选择 Celery Beat 和 Crontab 用于任务调度时,最佳选择取决于您的具体需求、系统环境以及与现有技术栈的兼容性。以下是两者的比较,帮助您做出决策:

Celery Beat

优点

  1. 集成性:如果您已经在使用 Celery 处理后台任务,Celery Beat 可以无缝集成,无需引入新的技术或工具。

  2. 灵活性:Celery Beat 支持多种调度策略,包括定时(timedelta)和定点(crontab)调度。这使得任务调度更加灵活。

  3. 分布式:Celery 支持分布式环境,可以在多个节点上运行 worker 和 scheduler,增强了系统的可扩展性和容错能力。

  4. 状态监控:与 Celery 集成意味着可以利用 Celery 的监控工具(如 Flower)来监控任务的状态和性能。

缺点

  1. 复杂性:相比 Crontab,Celery Beat 的设置和配置更复杂,需要更多的学习和维护成本。

  2. 资源消耗:Celery Beat 运行为一个长期进程,可能会消耗更多的系统资源。

Crontab

优点

  1. 简单性:Crontab 是 Unix/Linux 系统中的标准组件,使用广泛,配置简单直观。

  2. 轻量级:Crontab 不需要运行额外的后台进程,资源消耗相对较低。

  3. 稳定性:作为长期存在的系统工具,Crontab 非常稳定可靠。

缺点

  1. 平台依赖性:Crontab 主要用于 Unix 和 Linux 系统,不适用于 Windows 系统,这限制了其跨平台的应用。

  2. 功能限制:Crontab 的功能相对基础,主要限于时间调度,不支持任务执行的复杂逻辑控制。

  3. 缺乏集成:如果需要与 Python 应用或其他后台任务处理器集成,Crontab 可能不如 Celery Beat 那样方便。

总结

  • 如果您的应用已经在使用 Celery,并且需要一个支持复杂逻辑、可监控状态、可在分布式系统中运行的调度器,Celery Beat 是一个更好的选择。

  • 如果您需要一个简单、轻量级的解决方案,只需在特定时间执行脚本或命令,且主要在 Unix/Linux 环境下工作,Crontab 可能更适合您的需求。

最终选择应基于具体需求、技术栈以及对系统资源的考虑。

深入 Celery

1、 核心组件

  1. Celery Worker

  • 功能:执行实际的任务。Worker 是一个运行中的进程,它监听任务队列,获取任务并执行它们。

  • 执行流程:Worker 从消息队列中取出任务,执行任务逻辑,然后将结果存储在结果后端或直接丢弃,取决于配置。

  1. Broker(消息代理)

  • 功能:接收来自客户端的任务消息,并将它们转发给 Worker。常用的 Broker 包括 RabbitMQ、Redis、Amazon SQS 等。

  • 执行流程:Broker 管理着任务队列,保证任务消息的传递和存储,直到被 Worker 消费。

  1. Backend(结果后端)

  • 功能:存储任务的状态和结果。后端可以是任何支持存储键值对的系统,如 Redis、数据库、Amazon S3 等。

  • 执行流程:任务执行完毕后,结果被发送到配置的结果后端,供发起任务的客户端查询。

  1. Celery Client

  • 功能:发起任务。Client 可以是任何通过 Celery 库发起任务的应用。

  • 执行流程:Client 发起一个任务,这个任务被发送到 Broker,然后由 Worker 消费和执行。

  1. Celery Beat

  • 功能:定时任务调度器。Beat 用于定时发送任务消息到 Broker。

  • 执行流程:根据预设的调度计划(如每天凌晨执行),Beat 定时将任务发送到 Broker,然后由 Worker 执行。

2、 执行流程

  1. 任务发布

  • 开发者定义任务函数

  • Celery Client 创建任务消息,并将其发送到 Broker

  1. 任务调度

  • Broker 接收到任务消息,并将其存储在队列中

  • Celery Worker 监听队列,从 Broker 获取任务消息

  1. 任务执行

  • Worker 取出任务,执行任务逻辑

  • 执行结果可以选择存储在 Backend 中,或者直接丢弃

  1. 结果查询

  • 发起任务的 Client 可以查询 Backend,获取任务执行结果

常用函数的用法

1、 基本任务定义

使用 @app.task 装饰器定义一个简单的任务:

@app.task
def add(x, y):
    return x + y

2、 延迟执行任务

  • delay():这是异步执行任务的最常用方法,返回一个 AsyncResult 实例,用于查询任务状态或结果。

result = add.delay(4, 6)
  • apply_async():类似于 delay(),但提供更多控制选项,如设置任务的执行时间、优先级等。

result = add.apply_async((4, 6), countdown=10)  # 10秒后执行

3、 获取任务结果

  • get():从 AsyncResult 对象中获取任务结果。如果任务未完成,此调用将阻塞直到任务完成。

print(result.get(timeout=10))  # 等待最多10秒

4、 任务重试

  • retry():在任务函数中使用,用于在发生异常时重新尝试执行任务。

@app.task(bind=True)
def update_record(self, record_id):
    try:
        # 尝试更新记录
    except SomeException as exc:
        self.retry(exc=exc, countdown=5, max_retries=3)

5、 访问任务上下文

  • bind=True:在定义任务时使用此选项,使任务函数能够访问任务的上下文(self)。

@app.task(bind=True)
def my_task(self, x, y):
    print(self.request.id)  # 访问当前任务的请求ID

6、 链接和组合任务

  • chain():用于将多个任务链接在一起,这些任务将按顺序执行。

from celery import chain
chain(add.s(2, 2), add.s(4)).apply_async()
  • group():用于并行执行一组任务。

from celery import group
group(add.s(i, i) for i in range(10)).apply_async()
  • chord():允许在一组任务全部完成后执行一个回调任务。

from celery import chord
chord((add.s(i, i) for i in range(10)), add.s()).apply_async()

这些方法和功能使 Celery 成为处理各种复杂后台任务和调度需求的强大工具。

Django 集成

在 Django 中集成 Celery 主要涉及几个步骤:安装必要的包、配置 Celery、创建任务以及启动 worker 进程。下面是详细的步骤和示例:

1、安装 Celery 和消息代理

首先,您需要安装 Celery。Celery 支持多种消息代理,如 RabbitMQ、Redis 等。这里以 Redis 为例:

pip install celery redis

确保您也安装了 Redis,并且 Redis 服务正在运行。

2、 配置 Celery

在您的 Django 项目中,创建一个新的 Python 模块来配置 Celery。通常,这个模块被命名为 celery.py。在 Django 项目的同级目录下创建这个文件,例如在 your_project/your_project/ 目录下:

# your_project/your_project/celery.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

app = Celery('your_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

在、 settings.py 中添加 Celery 配置:

# your_project/your_project/settings.py

# Celery Configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# 更多配置
# Celery
# ------------------------------------------------------------------------------
# 如果启用时区设置
if USE_TZ:
    # 设置Celery使用的时区与项目时区一致
    CELERY_TIMEZONE = TIME_ZONE
# Celery应用名称
CELERY_APP_NAME = "affect"
# 设置存储Celery任务队列的Redis数据库
# 设置存储任务结果的数据仓库
# CELERY_RESULT_BACKEND = "django-db"
# 禁用Django Celery Beat的时区感知
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 启用任务错误传播
CELERY_TASK_EAGER_PROPAGATES = True
# 禁止Celery劫持根日志记录器
CELERYD_HIJACK_ROOT_LOGGER = False
# Celery消息代理URL
# CELERY_BROKER_URL = env("CELERY_BROKER_URL")
# CELERY_BROKER_URL = f"redis://:{REDIS_PASSWD}@{REDIS_HOST}:{REDIS_PORT}/{BROKER_DB}"
CELERY_BROKER_URL = "redis://:Affect_PySuper@1.13.0.17:16379/13"
# Celery结果后端设置
# CELERY_RESULT_BACKEND = f"redis://:{REDIS_PASSWD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
CELERY_RESULT_BACKEND = "redis://:Affect_PySuper@1.13.0.17:16379/14"

print(CELERY_BROKER_URL)
print(CELERY_RESULT_BACKEND)

# 指定接受的内容类型
CELERY_ACCEPT_CONTENT = ["json"]
# 任务序列化器
CELERY_TASK_SERIALIZER = "json"
# 结果序列化器
CELERY_RESULT_SERIALIZER = "json"
# 任务执行时间限制(5分钟)
CELERY_TASK_TIME_LIMIT = 5 * 60
# 任务软时间限制(1分钟)
CELERY_TASK_SOFT_TIME_LIMIT = 60
# 使用数据库作为任务调度器
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
# 定时任务配置
CELERY_ENABLE_UTC = False
CELERY_BEAT_SCHEDULE = {
    # 检查任务完成情况的定时任务
    "check-task-completion": {
        # 任务路径
        "task": "apps.common.tasks.check_task_completion",
        # 每天凌晨1点执行
        "schedule": crontab(hour=1, minute=0),
    },
}
# 指定导入的任务模块,可以指定多个
CELERY_IMPORTS = (
    "apps.task.tasks",
    "apps.order.tasks",
    "apps.settle.tasks",
    "apps.wechat.tasks",
    "apps.oms.tasks",
    "apps.track.tasks",
)
# 定义Celery任务队列配置
CELERY_TASK_QUEUES = {
    Queue("high_priority", Exchange("high_priority"), routing_key="high_priority"),
    Queue("default", Exchange("default"), routing_key="default"),
    Queue("low_priority", Exchange("low_priority"), routing_key="low_priority"),
}
# 设置默认队列名称
CELERY_TASK_DEFAULT_QUEUE = "default"
# 设置默认交换机名称
CELERY_TASK_DEFAULT_EXCHANGE = "default"
# 设置默认路由键
CELERY_TASK_DEFAULT_ROUTING_KEY = "default"
# 设置任务最大重试次数
CELERY_TASK_MAX_RETRIES = 3
# 设置任务重试延迟时间(5分钟)
CELERY_TASK_DEFAULT_RETRY_DELAY = 5 * 60  # 5 minutes
# 是否立即执行任务(测试用)
CELERY_TASK_ALWAYS_EAGER = False
# 是否禁用工作进程的速率限制
CELERY_WORKER_DISABLE_RATE_LIMITS = True
# 设置工作进程的并发数
CELERY_WORKER_CONCURRENCY = 4
# 设置工作进程的预取倍数
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
# 设置工作进程的日志格式
CELERY_WORKER_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"
# 设置工作进程的任务日志格式
CELERY_WORKER_TASK_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s"
# 设置标准输出重定向的日志级别
CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO"
# 设置结果后端的数据库连接
# CELERY_RESULT_BACKEND = "db+pool://..."

3、 创建 Celery 任务

在任何一个 Django app 下创建任务。例如,在 app/tasks.py 中定义一个简单的任务:

# your_project/app/tasks.py

from your_project.celery import app

@app.task
def add(x, y):
    return x + y

4、 启动 Celery Worker

在您的 Django 项目目录下,运行以下命令来启动 Celery worker:

celery -A your_project worker --loglevel=info

这条命令会启动一个 Celery worker,它会监听并执行定义的任务。

5、 调用任务

您可以在 Django 视图或模型中调用这些任务。例如,在视图中:

# your_project/app/views.py

from django.http import HttpResponse
from .tasks import add

def calculate(request):
    result = add.delay(4, 5)
    return HttpResponse(f"Task initiated, result will be {result.get(timeout=10)}")

6、注意事项

  • 确保 Celery 配置与您的消息代理设置相匹配。

  • 使用 delay() 方法异步执行任务。

  • 使用 get() 方法谨慎,因为它会阻塞请求直到任务完成。在生产环境中,通常不直接在请求响应中等待任务结果。

定时任务

在 Celery 中实现定时任务通常涉及使用 Celery 的定时任务功能,这是通过设置周期性任务(periodic tasks)来完成的。Celery 使用名为 Celery Beat 的调度器来运行定时任务。以下是如何设置和使用 Celery Beat 来运行周期性任务的步骤:

1、 安装 Celery 和消息代理

确保您已经安装了 Celery 和一个消息代理(如 Redis 或 RabbitMQ)。如果还没有安装,可以使用以下命令安装 Celery 和 Redis:

pip install celery redis

2、 配置 Celery

在您的项目中配置 Celery,如在一个 Python 文件中(例如 celery.py):

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.timezone = 'UTC'

3、 创建周期性任务

您可以定义一些周期性执行的任务。例如,创建一个简单的任务来打印当前时间:

from datetime import datetime
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def print_time():
    print("Current time:", datetime.utcnow().isoformat())

4、 设置 Celery Beat 调度

您需要设置 Celery Beat 的调度来指定任务执行的时间。这可以通过在 Celery 的配置中设置 beat_schedule 来完成:

from datetime import timedelta
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

app.conf.beat_schedule = {
    'print-time-every-30-seconds': {
        'task': 'your_module.print_time',
        'schedule': timedelta(seconds=30),
    },
}

在这个例子中,print_time 任务将每 30 秒执行一次。

5、 启动 Celery Worker 和 Celery Beat

要运行定时任务,您需要同时启动 Celery Worker 和 Celery Beat。可以在命令行中使用以下命令启动它们:

# 启动 Worker
celery -A your_module worker --loglevel=info

# 在另一个终端或后台任务中启动 Beat
celery -A your_module beat --loglevel=info

6、 可选:使用 Celery Beat 的持久化

默认情况下,Celery Beat 的调度是在内存中的,这意味着所有调度信息在重启后会丢失。为了持久化调度信息,您可以配置一个定时任务数据库:

app.conf.beat_schedule_filename = '/path/to/your/schedule.db'

这将使 Celery Beat 使用指定的数据库文件来存储调度信息,从而在重启后保持调度状态。

拓展

自定义模板

from celery import Task
from celery.utils.log import get_task_logger

class BaseTask(Task):
    # 设置重试次数和重试间隔
    max_retries = 3
    default_retry_delay = 5  # 5秒

    def __init__(self):
        self.logger = get_task_logger(__name__)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """任务失败时执行的操作"""
        self.logger.error(f'Task {self.name} [{task_id}] raised error: {exc!r}. Traceback: {einfo}')

    def on_success(self, retval, task_id, args, kwargs):
        """任务成功完成时执行的操作"""
        self.logger.info(f'Task {self.name} [{task_id}] completed successfully')

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """任务重试时执行的操作"""
        self.logger.warning(f'Task {self.name} [{task_id}] will be retried: {exc!r}')
from celery import Celery
from your_module.base_task import BaseTask

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(base=BaseTask)
def add(x, y):
    return x + y
celery -A your_module worker --loglevel=info

高可用

在 Celery 中实现高可用(High Availability, HA)主要涉及确保消息代理和 Celery worker 都能在出现故障时继续运行或快速恢复。这通常需要配置冗余、负载均衡和故障转移机制。以下是一些关键的策略和步骤:

1. 选择支持高可用的消息代理

Celery 依赖于消息代理来传递消息。选择一个支持高可用配置的消息代理是非常重要的。常用的消息代理如 RabbitMQ 和 Redis 都支持高可用配置。

  • RabbitMQ:可以通过设置 RabbitMQ 集群来实现高可用。集群中的节点可以复制队列,确保在一个节点失败时,其他节点可以接管任务。

  • Redis:可以使用 Redis 的 Sentinel 系统来监控 Redis 实例的健康状况,并自动进行故障转移。此外,Redis 集群提供了数据分片和自动分区,增强了数据的可用性和分布。

2. 配置 Celery Worker 的冗余

部署多个 Celery worker 实例可以提高系统的冗余度和容错能力。如果一个 worker 实例失败,其他实例可以继续处理任务。这可以通过简单地在多个服务器或容器中运行相同的 Celery worker 命令来实现。

3. 使用心跳和监控来检测故障

配置 Celery 和消息代理的心跳功能,以便能够检测到故障和不活跃的节点。此外,使用监控工具(如 Flower、Prometheus 或 Grafana)来实时监控 Celery 的健康状况和性能。

4. 自动扩展和负载均衡

  • 自动扩展:根据工作负载自动增加或减少 worker 数量。这可以通过 Kubernetes 或其他容器编排工具来实现。

  • 负载均衡:确保任务在多个 worker 之间均匀分配。在 Celery 中,可以通过正确配置任务路由和队列来实现。

5. 故障恢复策略

  • 任务持久化:配置 Celery 使得任务消息被持久化到磁盘,而不是仅存储在内存中。这样在进程或服务器崩溃后,未完成的任务可以被恢复和重新执行。

  • 自动重试机制:为任务配置自动重试,特别是在遇到可预见的临时错误时(如网络问题或依赖服务的暂时不可用)。

6. 数据备份和恢复

定期备份关键数据,包括任务结果和应用状态。确保在发生数据丢失时,可以从备份中恢复。

7. 文档和流程

确保有详细的文档和流程来处理故障情况,包括如何快速恢复服务和如何手动干预。

通过实施这些策略,您可以为您的 Celery 环境构建一个高可用的系统,从而提高系统的稳定性和可靠性,减少因单点故障导致的服务中断。

@shared_task 和 @app.task

在 Celery 中,@shared_task@app.task 是两种常用的方式来定义任务,它们各有特点和适用场景:

1、@app.task

这是最常见的方式来定义 Celery 任务。它将任务绑定到特定的 Celery 应用实例。这意味着,当你使用 @app.task 装饰器时,你需要先有一个 Celery 应用实例(通常是一个 Celery() 对象),然后使用这个实例来创建任务。

示例

from celery import Celery

app = Celery('my_app', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

优点

  • 明确指定了哪个 Celery 应用配置用于该任务。

  • 适用于大多数情况,特别是当你的项目中只有一个 Celery 应用时。

缺点

  • 如果你的项目中需要多个 Celery 应用实例,或者你想在多个应用中重用同一个任务,这种方式可能不够灵活。

2、@shared_task

@shared_task 装饰器用于创建一个可以在多个应用中共享的任务。这种方式不需要预先定义一个 Celery 应用实例,而是在运行时动态地附加到当前可用的 Celery 应用上。

示例

from celery import shared_task

@shared_task
def multiply(x, y):
    return x * y

优点

  • 提供了更高的灵活性,允许在不同的项目或应用中重用同一个任务。

  • 适合于库和框架开发,开发者可以定义任务而不必关心最终用户将使用什么 Celery 应用配置。

缺点

  • 由于它不绑定到特定的 Celery 应用,某些应用特定的配置可能不会自动应用到这些任务上,除非在任务执行时已经有一个 Celery 应用实例被创建和配置。

3、 总结

选择使用 @app.task 还是 @shared_task 主要取决于你的项目结构和需求:

  • 如果你的项目结构简单,只需要一个 Celery 应用,或者你需要对任务进行特定的应用配置,使用 @app.task 是合适的。

  • 如果你需要在多个项目或应用之间共享任务,或者你正在开发可重用的库,那么 @shared_task 提供了更好的灵活性和便利性。

启动命令

这里是对您提供的 Celery 命令行中每个参数的含义的解释:

Celery Beat 命令

celery beat --workdir=/home/celery/affect -A affect -l info -S django -f /celery_log/celery_beat.out
  • --workdir=/home/celery/affect: 设置工作目录,Celery 将在此目录下运行。

  • -A affect--app=affect: 指定 Celery 应用的名称。

  • -l info--loglevel=info: 设置日志级别为信息级别,这意味着将记录一般的操作信息。

  • -S django--scheduler=django: 使用 Django 的数据库调度器,这允许 Celery 使用 Django 数据库来存储定时任务的信息。

  • -f /celery_log/celery_beat.out--logfile=/celery_log/celery_beat.out: 指定日志文件的路径,Celery beat 的日志将被写入这个文件。

Celery Worker 命令

celery worker --workdir=/home/celery/affect -A affect -l debug -P threads -c 10 -f /celery_log/celery_worker.log
  • --workdir=/home/celery/affect: 设置工作目录,与 beat 命令相同。

  • -A affect--app=affect: 指定 Celery 应用的名称,与 beat 命令相同。

  • -l debug--loglevel=debug: 设置日志级别为调试级别,这将记录更详细的信息,帮助开发者调试问题。

  • -P threads--pool=threads: 使用线程池来执行任务,这是处理 I/O 密集型任务的一种常见方式。

  • -c 10--concurrency=10: 设置并发的 worker 数量为 10,这意味着可以同时执行 10 个任务。

  • -f /celery_log/celery_worker.log--logfile=/celery_log/celery_worker.log: 指定日志文件的路径,与 beat 命令相同,但是文件名不同,用于记录 worker 的日志。

这些参数配置了 Celery 的运行环境、日志记录方式以及任务执行的并发处理方式。

0

评论区