agentic_huge_data_base / wiki
页面 Onyx · 7.5 动态任务 Scheduling·DeepWiki 中文全文译文

7.5 · 动态任务 Scheduling(Dynamic Task Scheduling)

企业连接器与统一搜索 · 本章是 Onyx DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Onyx 章节7.5 状态全文译文 模块测试、发布与运维、工作流与编排、配置治理、文档对象与元数据
源码线索
  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/tasks/shared/tasks.py
模块标签
  • 测试、发布与运维
  • 工作流与编排
  • 配置治理
  • 文档对象与元数据
  • 系统架构

中文译文

动态任务 Scheduling(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/onyx-dot-app/onyx/7.5-dynamic-task-scheduling
翻译时间:2026-05-27T08:44:53.635Z
翻译模型:deepseek-chat
原文字符数:13043
项目:Onyx (onyx)

---

动态任务调度

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/tasks/shared/tasks.py
  • backend/onyx/background/celery/tasks/vespa/tasks.py
  • backend/onyx/configs/app_configs.py
  • backend/onyx/configs/constants.py
  • backend/onyx/document_index/factory.py
  • backend/onyx/redis/redis_pool.py
  • backend/scripts/dev_run_background_jobs.py
  • backend/supervisord.conf
  • backend/tests/external_dependency_unit/redis/test_tenant_redis.py

本文档介绍 Onyx 中的动态任务调度系统,重点说明 Celery Beat 如何配置以生成和管理周期性后台任务。关于任务本身及其执行的信息,请参见后台处理与协调。关于工作进程类型和队列分配的详细信息,请参见Celery 工作进程架构

目的与架构

Onyx 后台处理系统使用自定义的 DynamicTenantScheduler,该类扩展了 Celery 的 PersistentScheduler,支持运行时配置的任务调度。该调度器同时支持单租户(自托管)和多租户(云)部署,可根据租户可用性和可调节的速率限制动态生成任务调度。

关键能力:

  • 无需重启进程即可动态重新生成调度 backend/onyx/background/celery/apps/beat.py:155-156
  • 多租户模式下按租户生成任务 backend/onyx/background/celery/apps/beat.py:123-151
  • 通过 Redis 中存储的 beat_multiplier 实现运行时可调节的任务生成速率 backend/onyx/background/celery/apps/beat.py:169-171
  • 每 60 秒自动发现租户并更新调度 backend/onyx/background/celery/apps/beat.py:31-41

来源:backend/onyx/background/celery/apps/beat.py:27-41, backend/onyx/background/celery/apps/beat.py:123-151, shared_configs/configs.py:56-56

DynamicTenantScheduler 类

DynamicTenantScheduler 定义在 backend/onyx/background/celery/apps/beat.py:27-235 中,提供了核心调度逻辑。它重写了 tick() 方法,以定期检查调度更新。

自然语言到代码实体的调度器架构

下图将高级调度概念映射到实现它们的特定代码实体。

graph TB
    subgraph "Celery Beat 进程 [backend/onyx/background/celery/apps/beat.py]"
        BeatApp["celery_app<br/>(Celery 实例)"]
        Scheduler["DynamicTenantScheduler<br/>(PersistentScheduler)"]
    end

    subgraph "调度蓝图 [backend/onyx/background/celery/tasks/beat_schedule.py]"
        Templates["beat_task_templates<br/>(list[dict])"]
        CloudTasks["get_cloud_tasks_to_schedule<br/>(函数)"]
    end

    subgraph "运行时与数据 [backend/onyx/db/engine/tenant_utils.py]"
        Redis[(Redis 存储)]
        Runtime["OnyxRuntime<br/>.get_beat_multiplier()"]
        TenantDB["get_all_tenant_ids()<br/>(PostgreSQL)"]
    end

    subgraph "输出调度"
        TenantTasks["按租户的任务<br/>{name}-{tenant_id}"]
        SystemTasks["系统任务<br/>cloud_*"]
    end

    BeatApp -->|"配置"| Scheduler
    Templates --> Scheduler
    CloudTasks --> Scheduler

    Scheduler -->|"每 60 秒重新加载"| Runtime
    Scheduler --> TenantDB
    Runtime --> Redis

    Scheduler -->|"_generate_schedule()"| TenantTasks
    Scheduler -->|"_generate_schedule()"| SystemTasks

    TenantTasks -->|"入队到"| Workers["Celery 工作进程"]
    SystemTasks -->|"入队到"| Workers

来源:backend/onyx/background/celery/apps/beat.py:27-145, backend/onyx/background/celery/tasks/beat_schedule.py:37-179, backend/onyx/db/engine/tenant_utils.py:15-15

关键方法:

  • tick():由 Celery Beat 定期调用;每 RELOAD_INTERVAL(60 秒)检查调度是否需要更新 backend/onyx/background/celery/apps/beat.py:61-80
  • _try_updating_schedule():获取当前租户和 beat 乘数,如果检测到任何变化则重新生成调度 backend/onyx/background/celery/apps/beat.py:155-190
  • _generate_schedule():根据 MULTI_TENANT 标志将任务模板转换为按租户或系统范围的任务条目 backend/onyx/background/celery/apps/beat.py:82-153

来源:backend/onyx/background/celery/apps/beat.py:27-235

Beat 任务模板

Beat 任务模板作为周期性任务的蓝图。每个模板指定了任务名称、Celery 任务标识符、调度间隔和执行选项。

模板结构
字段类型描述
namestr人类可读的任务标识符 backend/onyx/background/celery/tasks/beat_schedule.py:39-39
taskstrCelery 任务名称(来自 OnyxCeleryTask 枚举) backend/onyx/background/celery/tasks/beat_schedule.py:40-40
scheduletimedelta执行频率 backend/onyx/background/celery/tasks/beat_schedule.py:41-41
optionsdict优先级、过期时间和队列分配 backend/onyx/background/celery/tasks/beat_schedule.py:42-45

来源:backend/onyx/background/celery/tasks/beat_schedule.py:37-46, backend/onyx/configs/constants.py:18-18

核心任务模板

以下核心模板配置在 backend/onyx/background/celery/tasks/beat_schedule.py:37-179 中:

任务名称任务枚举频率优先级描述
check-for-user-file-processingCHECK_FOR_USER_FILE_PROCESSING20 秒处理用户上传的文件 backend/onyx/background/celery/tasks/beat_schedule.py:39-46
check-for-indexingCHECK_FOR_INDEXING15 秒触发连接器索引 backend/onyx/background/celery/tasks/beat_schedule.py:66-74
check-for-connector-deletionCHECK_FOR_CONNECTOR_DELETION20 秒处理后台清理 backend/onyx/background/celery/tasks/beat_schedule.py:100-110
check-for-vespa-syncCHECK_FOR_VESPA_SYNC_TASK20 秒将元数据同步到文档索引 backend/onyx/background/celery/tasks/beat_schedule.py:112-120
check-for-pruningCHECK_FOR_PRUNING20 秒移除过期文档 backend/onyx/background/celery/tasks/beat_schedule.py:122-130
monitor-background-processesMONITOR_BACKGROUND_PROCESSES5 分钟工作进程健康检查 backend/onyx/background/celery/tasks/beat_schedule.py:141-149

来源:backend/onyx/background/celery/tasks/beat_schedule.py:37-179

多租户任务生成

在多租户部署(MULTI_TENANT=true)中,调度器使用两层方法来管理跨多个租户的负载。

系统范围的云任务

云任务生成一次,处理跨所有租户的系统级操作。它们通过 get_cloud_tasks_to_schedule(beat_multiplier) 获取,并为整个集群仅调度一次 backend/onyx/background/celery/apps/beat.py:91-111

按租户的任务扩展

对于通过 get_all_tenant_ids() 发现的每个租户,调度器会生成单独的任务实例。这样可以实现按租户的细粒度控制和后台工作隔离。

graph TB
    subgraph "租户发现 [backend/onyx/db/engine/tenant_utils.py]"
        GetTenants["get_all_tenant_ids()"]
        TenantList["['tenant_a', 'tenant_b']"]
    end

    subgraph "任务蓝图 [backend/onyx/background/celery/tasks/beat_schedule.py]"
        Template["name: check-for-indexing<br/>task: CHECK_FOR_INDEXING"]
    end

    subgraph "扩展后的调度 [backend/onyx/background/celery/apps/beat.py]"
        T1["check-for-indexing-tenant_a<br/>kwargs: {tenant_id: 'tenant_a'}"]
        T2["check-for-indexing-tenant_b<br/>kwargs: {tenant_id: 'tenant_b'}"]
    end

    GetTenants --> TenantList
    TenantList --> Template
    Template --> T1
    Template --> T2

    T1 -->|"入队"| Queue["OnyxCeleryQueues"]

来源:backend/onyx/background/celery/apps/beat.py:123-151, backend/onyx/db/engine/tenant_utils.py:15-15

每个生成的任务在其 kwargs 中包含 tenant_id,该值通过 TenantAwareTask 基类传播到任务执行中 backend/onyx/background/celery/apps/app_base.py:80-99

Beat 乘数配置

beat_multiplier 是一个运行时可调节的浮点值,用于缩放任务生成速率。这在云部署中至关重要,可以防止工作进程过载。

配置项默认值用途
CLOUD_BEAT_MULTIPLIER_DEFAULT8.0缩放任务间隔(例如,15 秒变为 120 秒) backend/onyx/background/celery/tasks/beat_schedule.py:33-33
RELOAD_INTERVAL60调度器检查乘数变化的频率(秒) backend/onyx/background/celery/apps/beat.py:31-31

乘数通过 OnyxRuntime.get_beat_multiplier() 获取 backend/onyx/background/celery/apps/beat.py:169-169。如果乘数或租户列表发生变化,DynamicTenantScheduler 会重建其内部调度 backend/onyx/background/celery/apps/beat.py:177-189

来源:backend/onyx/background/celery/apps/beat.py:36-41, backend/onyx/background/celery/tasks/beat_schedule.py:33-33

存活探针与监控

调度器实现了健康监控机制:

存活探针文件

每次重新加载间隔完成时,调度器会更新一个存活探针文件,该文件路径由 make_probe_path("liveness", "beat@hostname") 生成 backend/onyx/background/celery/apps/beat.py:50-69。外部监控系统可以检查该文件的修改时间。

Redis 协调与栅栏

后台任务使用"栅栏"模式防止重叠执行。例如,check_for_vespa_sync_task 会获取一个超时时间为 CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT(120 秒)的 OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCKbackend/onyx/background/celery/tasks/vespa/tasks.py:95-102。这确保了即使有多个 beat 实例运行,每个租户也只有一个实例会启动同步逻辑。

来源:backend/onyx/background/celery/apps/beat.py:50-69, backend/onyx/background/celery/tasks/vespa/tasks.py:95-102, backend/onyx/configs/constants.py:130-130

初始化与启动

Beat 调度器通过 Celery 的 beat_init 信号进行初始化。

sequenceDiagram
    participant Process as Celery Beat 进程
    participant Signal as beat_init 信号
    participant Handler as on_beat_init [beat.py]
    participant Engine as SqlEngine
    participant Sched as DynamicTenantScheduler

    Process->>Signal: 启动
    Signal->>Handler: 触发

    Handler->>Engine: set_app_name("celery_beat")
    Handler->>Engine: init_engine(pool_size=2)

    Handler->>Handler: wait_for_redis()

    Handler->>Sched: _try_updating_schedule()
    Note over Sched: 获取租户和乘数

    Sched-->>Handler: 调度已重建
    Handler-->>Process: 就绪

来源:backend/onyx/background/celery/apps/beat.py:237-253, backend/onyx/configs/constants.py:85-85

beat_init 信号处理器使用较小的连接池大小(2)执行数据库初始化,因为调度器只读取租户元数据,不执行繁重的数据库操作 backend/onyx/background/celery/apps/beat.py:238-241

来源:backend/onyx/background/celery/apps/beat.py:237-253