动态任务 Scheduling(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/onyx-dot-app/onyx/7.5-dynamic-task-scheduling
翻译时间:2026-05-27T08:44:53.635Z
翻译模型:deepseek-chat
原文字符数:13043
项目:Onyx (onyx)
---
动态任务调度
相关源文件
以下文件为本 Wiki 页面的生成提供了上下文:
backend/ee/onyx/background/celery/apps/primary.pybackend/ee/onyx/background/celery/tasks/beat_schedule.pybackend/onyx/background/celery/apps/app_base.pybackend/onyx/background/celery/apps/beat.pybackend/onyx/background/celery/apps/heavy.pybackend/onyx/background/celery/apps/light.pybackend/onyx/background/celery/apps/monitoring.pybackend/onyx/background/celery/apps/primary.pybackend/onyx/background/celery/tasks/beat_schedule.pybackend/onyx/background/celery/tasks/shared/tasks.pybackend/onyx/background/celery/tasks/vespa/tasks.pybackend/onyx/configs/app_configs.pybackend/onyx/configs/constants.pybackend/onyx/document_index/factory.pybackend/onyx/redis/redis_pool.pybackend/scripts/dev_run_background_jobs.pybackend/supervisord.confbackend/tests/external_dependency_unit/redis/test_tenant_redis.py
本文档介绍 Onyx 中的动态任务调度系统,重点说明 Celery Beat 如何配置以生成和管理周期性后台任务。关于任务本身及其执行的信息,请参见后台处理与协调。关于工作进程类型和队列分配的详细信息,请参见Celery 工作进程架构。
目的与架构
Onyx 后台处理系统使用自定义的 DynamicTenantScheduler,该类扩展了 Celery 的 PersistentScheduler,支持运行时配置的任务调度。该调度器同时支持单租户(自托管)和多租户(云)部署,可根据租户可用性和可调节的速率限制动态生成任务调度。
关键能力:
- 无需重启进程即可动态重新生成调度
backend/onyx/background/celery/apps/beat.py:155-156 - 多租户模式下按租户生成任务
backend/onyx/background/celery/apps/beat.py:123-151 - 通过 Redis 中存储的
beat_multiplier实现运行时可调节的任务生成速率backend/onyx/background/celery/apps/beat.py:169-171 - 每 60 秒自动发现租户并更新调度
backend/onyx/background/celery/apps/beat.py:31-41
来源:backend/onyx/background/celery/apps/beat.py:27-41, backend/onyx/background/celery/apps/beat.py:123-151, shared_configs/configs.py:56-56
DynamicTenantScheduler 类
DynamicTenantScheduler 定义在 backend/onyx/background/celery/apps/beat.py:27-235 中,提供了核心调度逻辑。它重写了 tick() 方法,以定期检查调度更新。
自然语言到代码实体的调度器架构
下图将高级调度概念映射到实现它们的特定代码实体。
graph TB
subgraph "Celery Beat 进程 [backend/onyx/background/celery/apps/beat.py]"
BeatApp["celery_app<br/>(Celery 实例)"]
Scheduler["DynamicTenantScheduler<br/>(PersistentScheduler)"]
end
subgraph "调度蓝图 [backend/onyx/background/celery/tasks/beat_schedule.py]"
Templates["beat_task_templates<br/>(list[dict])"]
CloudTasks["get_cloud_tasks_to_schedule<br/>(函数)"]
end
subgraph "运行时与数据 [backend/onyx/db/engine/tenant_utils.py]"
Redis[(Redis 存储)]
Runtime["OnyxRuntime<br/>.get_beat_multiplier()"]
TenantDB["get_all_tenant_ids()<br/>(PostgreSQL)"]
end
subgraph "输出调度"
TenantTasks["按租户的任务<br/>{name}-{tenant_id}"]
SystemTasks["系统任务<br/>cloud_*"]
end
BeatApp -->|"配置"| Scheduler
Templates --> Scheduler
CloudTasks --> Scheduler
Scheduler -->|"每 60 秒重新加载"| Runtime
Scheduler --> TenantDB
Runtime --> Redis
Scheduler -->|"_generate_schedule()"| TenantTasks
Scheduler -->|"_generate_schedule()"| SystemTasks
TenantTasks -->|"入队到"| Workers["Celery 工作进程"]
SystemTasks -->|"入队到"| Workers
来源:backend/onyx/background/celery/apps/beat.py:27-145, backend/onyx/background/celery/tasks/beat_schedule.py:37-179, backend/onyx/db/engine/tenant_utils.py:15-15
关键方法:
tick():由 Celery Beat 定期调用;每RELOAD_INTERVAL(60 秒)检查调度是否需要更新backend/onyx/background/celery/apps/beat.py:61-80_try_updating_schedule():获取当前租户和 beat 乘数,如果检测到任何变化则重新生成调度backend/onyx/background/celery/apps/beat.py:155-190_generate_schedule():根据MULTI_TENANT标志将任务模板转换为按租户或系统范围的任务条目backend/onyx/background/celery/apps/beat.py:82-153
来源:backend/onyx/background/celery/apps/beat.py:27-235
Beat 任务模板
Beat 任务模板作为周期性任务的蓝图。每个模板指定了任务名称、Celery 任务标识符、调度间隔和执行选项。
模板结构
| 字段 | 类型 | 描述 |
|---|---|---|
name | str | 人类可读的任务标识符 backend/onyx/background/celery/tasks/beat_schedule.py:39-39 |
task | str | Celery 任务名称(来自 OnyxCeleryTask 枚举) backend/onyx/background/celery/tasks/beat_schedule.py:40-40 |
schedule | timedelta | 执行频率 backend/onyx/background/celery/tasks/beat_schedule.py:41-41 |
options | dict | 优先级、过期时间和队列分配 backend/onyx/background/celery/tasks/beat_schedule.py:42-45 |
来源:backend/onyx/background/celery/tasks/beat_schedule.py:37-46, backend/onyx/configs/constants.py:18-18
核心任务模板
以下核心模板配置在 backend/onyx/background/celery/tasks/beat_schedule.py:37-179 中:
| 任务名称 | 任务枚举 | 频率 | 优先级 | 描述 |
|---|---|---|---|---|
check-for-user-file-processing | CHECK_FOR_USER_FILE_PROCESSING | 20 秒 | 中 | 处理用户上传的文件 backend/onyx/background/celery/tasks/beat_schedule.py:39-46 |
check-for-indexing | CHECK_FOR_INDEXING | 15 秒 | 中 | 触发连接器索引 backend/onyx/background/celery/tasks/beat_schedule.py:66-74 |
check-for-connector-deletion | CHECK_FOR_CONNECTOR_DELETION | 20 秒 | 中 | 处理后台清理 backend/onyx/background/celery/tasks/beat_schedule.py:100-110 |
check-for-vespa-sync | CHECK_FOR_VESPA_SYNC_TASK | 20 秒 | 中 | 将元数据同步到文档索引 backend/onyx/background/celery/tasks/beat_schedule.py:112-120 |
check-for-pruning | CHECK_FOR_PRUNING | 20 秒 | 中 | 移除过期文档 backend/onyx/background/celery/tasks/beat_schedule.py:122-130 |
monitor-background-processes | MONITOR_BACKGROUND_PROCESSES | 5 分钟 | 低 | 工作进程健康检查 backend/onyx/background/celery/tasks/beat_schedule.py:141-149 |
来源:backend/onyx/background/celery/tasks/beat_schedule.py:37-179
多租户任务生成
在多租户部署(MULTI_TENANT=true)中,调度器使用两层方法来管理跨多个租户的负载。
系统范围的云任务
云任务生成一次,处理跨所有租户的系统级操作。它们通过 get_cloud_tasks_to_schedule(beat_multiplier) 获取,并为整个集群仅调度一次 backend/onyx/background/celery/apps/beat.py:91-111。
按租户的任务扩展
对于通过 get_all_tenant_ids() 发现的每个租户,调度器会生成单独的任务实例。这样可以实现按租户的细粒度控制和后台工作隔离。
graph TB
subgraph "租户发现 [backend/onyx/db/engine/tenant_utils.py]"
GetTenants["get_all_tenant_ids()"]
TenantList["['tenant_a', 'tenant_b']"]
end
subgraph "任务蓝图 [backend/onyx/background/celery/tasks/beat_schedule.py]"
Template["name: check-for-indexing<br/>task: CHECK_FOR_INDEXING"]
end
subgraph "扩展后的调度 [backend/onyx/background/celery/apps/beat.py]"
T1["check-for-indexing-tenant_a<br/>kwargs: {tenant_id: 'tenant_a'}"]
T2["check-for-indexing-tenant_b<br/>kwargs: {tenant_id: 'tenant_b'}"]
end
GetTenants --> TenantList
TenantList --> Template
Template --> T1
Template --> T2
T1 -->|"入队"| Queue["OnyxCeleryQueues"]
来源:backend/onyx/background/celery/apps/beat.py:123-151, backend/onyx/db/engine/tenant_utils.py:15-15
每个生成的任务在其 kwargs 中包含 tenant_id,该值通过 TenantAwareTask 基类传播到任务执行中 backend/onyx/background/celery/apps/app_base.py:80-99。
Beat 乘数配置
beat_multiplier 是一个运行时可调节的浮点值,用于缩放任务生成速率。这在云部署中至关重要,可以防止工作进程过载。
| 配置项 | 默认值 | 用途 |
|---|---|---|
CLOUD_BEAT_MULTIPLIER_DEFAULT | 8.0 | 缩放任务间隔(例如,15 秒变为 120 秒) backend/onyx/background/celery/tasks/beat_schedule.py:33-33 |
RELOAD_INTERVAL | 60 | 调度器检查乘数变化的频率(秒) backend/onyx/background/celery/apps/beat.py:31-31 |
乘数通过 OnyxRuntime.get_beat_multiplier() 获取 backend/onyx/background/celery/apps/beat.py:169-169。如果乘数或租户列表发生变化,DynamicTenantScheduler 会重建其内部调度 backend/onyx/background/celery/apps/beat.py:177-189。
来源:backend/onyx/background/celery/apps/beat.py:36-41, backend/onyx/background/celery/tasks/beat_schedule.py:33-33
存活探针与监控
调度器实现了健康监控机制:
存活探针文件
每次重新加载间隔完成时,调度器会更新一个存活探针文件,该文件路径由 make_probe_path("liveness", "beat@hostname") 生成 backend/onyx/background/celery/apps/beat.py:50-69。外部监控系统可以检查该文件的修改时间。
Redis 协调与栅栏
后台任务使用"栅栏"模式防止重叠执行。例如,check_for_vespa_sync_task 会获取一个超时时间为 CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT(120 秒)的 OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK 锁 backend/onyx/background/celery/tasks/vespa/tasks.py:95-102。这确保了即使有多个 beat 实例运行,每个租户也只有一个实例会启动同步逻辑。
来源:backend/onyx/background/celery/apps/beat.py:50-69, backend/onyx/background/celery/tasks/vespa/tasks.py:95-102, backend/onyx/configs/constants.py:130-130
初始化与启动
Beat 调度器通过 Celery 的 beat_init 信号进行初始化。
sequenceDiagram
participant Process as Celery Beat 进程
participant Signal as beat_init 信号
participant Handler as on_beat_init [beat.py]
participant Engine as SqlEngine
participant Sched as DynamicTenantScheduler
Process->>Signal: 启动
Signal->>Handler: 触发
Handler->>Engine: set_app_name("celery_beat")
Handler->>Engine: init_engine(pool_size=2)
Handler->>Handler: wait_for_redis()
Handler->>Sched: _try_updating_schedule()
Note over Sched: 获取租户和乘数
Sched-->>Handler: 调度已重建
Handler-->>Process: 就绪
来源:backend/onyx/background/celery/apps/beat.py:237-253, backend/onyx/configs/constants.py:85-85
beat_init 信号处理器使用较小的连接池大小(2)执行数据库初始化,因为调度器只读取租户元数据,不执行繁重的数据库操作 backend/onyx/background/celery/apps/beat.py:238-241。
来源:backend/onyx/background/celery/apps/beat.py:237-253