agentic_huge_data_base / wiki
页面 Onyx · 7 后台处理与协作·DeepWiki 中文全文译文

7 · 后台处理与协作(Background Processing and Coordination)

企业连接器与统一搜索 · 本章是 Onyx DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Onyx 章节7 状态全文译文 模块文档对象与元数据、工作流与编排、系统架构、测试、发布与运维
源码线索
  • backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.py
  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
模块标签
  • 文档对象与元数据
  • 工作流与编排
  • 系统架构
  • 测试、发布与运维
  • 存储与持久化

中文译文

后台处理与协作(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/onyx-dot-app/onyx/7-background-processing-and-coordination
翻译时间:2026-05-27T08:44:56.322Z
翻译模型:deepseek-chat
原文字符数:14807
项目:Onyx (onyx)

---

后台处理与协调

相关源文件

生成此维基页面所使用的上下文文件如下:

  • backend/alembic/versions/14162713706c_add_index_attempt_stage_metric_table.py
  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/tasks/docprocessing/tasks.py
  • backend/onyx/background/celery/tasks/shared/tasks.py
  • backend/onyx/background/celery/tasks/vespa/tasks.py
  • backend/onyx/background/indexing/run_docfetching.py
  • backend/onyx/configs/app_configs.py
  • backend/onyx/configs/constants.py
  • backend/onyx/db/file_record.py
  • backend/onyx/db/index_attempt_metrics.py
  • backend/onyx/db/index_attempt_metrics_models.py
  • backend/onyx/document_index/factory.py
  • backend/onyx/file_store/staging.py
  • backend/onyx/redis/redis_pool.py
  • backend/onyx/utils/postgres_sanitization.py
  • backend/scripts/dev_run_background_jobs.py
  • backend/supervisord.conf
  • backend/tests/external_dependency_unit/celery/test_docprocessing_priority.py
  • backend/tests/external_dependency_unit/db/test_index_attempt_stage_metrics.py
  • backend/tests/external_dependency_unit/file_store/test_staging_concurrent_attempt_skip.py
  • backend/tests/external_dependency_unit/indexing/__init__.py
  • backend/tests/external_dependency_unit/indexing/test_docfetching_orphan_cleanup.py
  • backend/tests/external_dependency_unit/indexing/test_document_deletion_file_cleanup.py
  • backend/tests/external_dependency_unit/indexing/test_index_doc_batch_prepare.py
  • backend/tests/external_dependency_unit/indexing_helpers.py
  • backend/tests/external_dependency_unit/redis/test_tenant_redis.py
  • backend/tests/external_dependency_unit/search_settings/test_index_swap_workflow.py
  • backend/tests/unit/onyx/db/test_tools.py

本文档描述了 Onyx 基于 Celery 的后台处理基础设施,包括任务编排、基于 Redis 的协调模式、工作线程池架构以及动态任务调度。该系统负责处理异步操作,例如文档索引、连接器清理、权限同步以及向量数据库中的元数据更新。

关于文档索引管线的详细信息,请参阅文档索引管线。关于连接器配置和管理,请参阅连接器框架概述

系统概述

Onyx 使用 Celery 作为分布式任务队列系统,并以 Redis 作为消息代理和协调层。该架构将关注点分离到专门的工作线程池中,每个池针对不同的工作负载特性(I/O 密集型获取、CPU 密集型处理、轻量级元数据更新)进行了优化。

系统组件图

标题:后台处理架构

graph TB
    subgraph "调度层"
        Beat["Celery Beat<br/>(DynamicTenantScheduler)"]
    end

    subgraph "工作线程池"
        PrimaryWorker["主工作线程<br/>(celery 队列)"]
        LightWorker["轻量工作线程<br/>(vespa_metadata_sync,<br/>doc_permissions_upsert)"]
        HeavyWorker["重量工作线程<br/>(connector_pruning,<br/>connector_doc_permissions_sync)"]
        DocFetchWorker["文档获取工作线程<br/>(connector_doc_fetching)"]
        DocProcessWorker["文档处理工作线程<br/>(docprocessing)"]
    end

    subgraph "协调层"
        RedisLocks["Redis 锁<br/>(OnyxRedisLocks)"]
        RedisFences["Redis 栅栏<br/>(ACTIVE_FENCES)"]
        RedisSignals["Redis 信号<br/>(BLOCK_*)"]
    end

    subgraph "存储层"
        Postgres[("PostgreSQL<br/>(SyncRecord,<br/>IndexAttempt)")]
        Vespa[("Vespa<br/>(文档索引)")]
        RedisDB[("Redis<br/>(tasksets, payloads)")]
    end

    Beat -->|"调度 beat 任务"| PrimaryWorker
    Beat -->|"调度 beat 任务"| LightWorker
    Beat -->|"调度 beat 任务"| HeavyWorker

    PrimaryWorker -->|"创建子任务"| LightWorker
    PrimaryWorker -->|"创建子任务"| HeavyWorker
    PrimaryWorker -->|"创建子任务"| DocFetchWorker

    DocFetchWorker -->|"入队"| DocProcessWorker

    PrimaryWorker -.->|"获取"| RedisLocks
    LightWorker -.->|"使用"| RedisFences
    HeavyWorker -.->|"使用"| RedisFences

    LightWorker --> Postgres
    LightWorker --> Vespa
    DocProcessWorker --> Postgres
    DocProcessWorker --> Vespa

    PrimaryWorker --> RedisDB
    LightWorker --> RedisDB
    HeavyWorker --> RedisDB

来源: backend/onyx/background/celery/apps/beat.py:27-58, backend/onyx/configs/constants.py:83-93, backend/onyx/background/celery/apps/primary.py:115-130

Celery 工作线程架构

Onyx 采用多池工作线程策略,以防止长时间运行的任务(如深度索引)阻塞关键维护任务(如权限同步或文档删除)。配置通过 supervisord.confdev_run_background_jobs.py 进行管理。

工作线程池配置

系统定义了专门的工作线程池,每个池分配给特定的 OnyxCeleryQueues

工作线程池应用名称主要队列用途
主工作线程celery_worker_primarycelery高层协调和单例清理任务。
轻量工作线程celery_worker_lightvespa_metadata_sync, connector_deletion, doc_permissions_upsert快速元数据更新和数据库维护。
重量工作线程celery_worker_heavyconnector_pruning, connector_doc_permissions_sync资源密集型同步任务。
文档获取工作线程celery_worker_docfetchingconnector_doc_fetchingI/O 密集型任务,从外部 API 拉取数据。
文档处理工作线程celery_worker_docprocessingdocprocessingCPU 密集型解析和嵌入向量生成。

并发与生命周期:

  • 工作线程在初始化时分配特定的队列,通常通过 supervisord 进行 backend/supervisord.conf:30-116
  • 主工作线程 执行单例初始化,例如在启动时清除 OnyxRedisLocks.PRIMARY_WORKERbackend/onyx/background/celery/apps/primary.py:156-170
  • 在多租户环境中,TenantAwareTask 基类确保在执行前正确设置 CURRENT_TENANT_ID_CONTEXTVAR backend/onyx/background/celery/apps/app_base.py:80-99
  • 详细信息请参阅 Celery 工作线程架构

来源: backend/onyx/configs/constants.py:83-96, backend/onyx/background/celery/apps/primary.py:115-130, backend/onyx/background/celery/apps/app_base.py:80-99, backend/supervisord.conf:30-116

Redis 协调模式

栅栏与任务集

Onyx 使用"栅栏"模式来管理分布式操作的生命周期。这可以防止多个工作线程执行相同的任务,并提供一种机制来跟踪一组子任务(称为"任务集")何时完成。

关键组件:

  • TenantRedis:一个自定义的 Redis 客户端,会自动为键添加 tenant_id 前缀,以确保缓存层的多租户隔离 backend/onyx/redis/redis_pool.py:65-92
  • 栅栏键:Redis 键(通常带有 TTL),用于指示特定实体的活动操作 backend/onyx/background/celery/tasks/vespa/document_sync.py:23-27
  • 任务集键:Redis Set 结构,包含由生成器任务产生的所有子任务的 UUID backend/onyx/background/celery/apps/app_base.py:140-166
  • ACTIVE_FENCES:一个全局的 Redis 集合,在 OnyxRedisConstants.ACTIVE_FENCES 中跟踪,由监控 beat 用于验证和清理过期的栅栏 backend/onyx/background/celery/tasks/vespa/tasks.py:163-172
锁注册表(OnyxRedisLocks
锁名称超时时间文件引用
CHECK_VESPA_SYNC_BEAT_LOCK120秒backend/onyx/configs/constants.py:130
PRIMARY_WORKER120秒backend/onyx/configs/constants.py:133
CELERY_INDEXING_LOCK3小时15分钟backend/onyx/configs/constants.py:150

详细信息请参阅 Redis 协调模式

来源: backend/onyx/configs/constants.py:128-168, backend/onyx/background/celery/apps/app_base.py:140-182, backend/onyx/redis/redis_pool.py:65-175

文档索引管线

索引管线是一个多阶段过程,由生成器任务进行编排。

  1. 文档获取:工作线程通过 instantiate_connector 从源中拉取 Document 对象 backend/onyx/background/indexing/run_docfetching.py:117-124
  2. 批处理:文档被分批存储到 DocumentBatchStoragebackend/onyx/file_store/document_batch_storage.py:103-104
  3. 文档处理:子任务获取批次,执行解析、片段切分和嵌入向量生成 backend/onyx/background/celery/tasks/docprocessing/tasks.py:107-111
  4. Vespa 更新:最终的向量和元数据通过 run_indexing_pipeline backend/onyx/background/celery/tasks/docprocessing/tasks.py:111VespaDocumentFields backend/onyx/document_index/interfaces.py:53 推送到文档索引。

详细信息请参阅 文档索引管线

来源: backend/onyx/background/celery/tasks/docprocessing/tasks.py:107-111, backend/onyx/background/indexing/run_docfetching.py:117-124

Vespa 同步与维护任务

维护任务确保搜索索引与主 PostgreSQL 数据库保持一致。

  • 元数据同步check_for_vespa_sync_task 识别已更改的文档、DocumentSet 成员资格或 UserGroup 权限,并触发同步 backend/onyx/background/celery/tasks/vespa/tasks.py:75-109
  • 同步记录:系统使用 SyncRecord 模型和 SyncStatus 枚举来跟踪后台更新的进度 backend/onyx/db/enums.py:44-45

详细信息请参阅 Vespa 同步与维护任务

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:75-158, backend/onyx/db/enums.py:44-45

动态任务调度

Onyx 使用自定义调度器来处理多租户任务生成和速率限制。

调度逻辑
  • DynamicTenantScheduler:一个自定义的 Celery 调度器,会定期重新加载其调度,并可以适应租户的添加/移除 backend/onyx/background/celery/apps/beat.py:27-41
  • Beat 乘数:使用 beat_multiplier(云环境默认值为 8.0)来缩放任务间隔,在工作线程过载时提供背压 backend/onyx/background/celery/tasks/beat_schedule.py:33
  • 任务模板beat_task_templates 列表定义了索引检查和清理等周期性任务的调度和优先级 backend/onyx/background/celery/tasks/beat_schedule.py:37-179

详细信息请参阅 动态任务调度

来源: backend/onyx/background/celery/tasks/beat_schedule.py:28-179, backend/onyx/background/celery/apps/beat.py:27-80

连接器生命周期操作

后台任务管理连接器生命周期的"清理"阶段:

  • 清理check_for_pruning 识别索引中不再存在于源中的文档并将其移除 backend/onyx/background/celery/tasks/beat_schedule.py:122-130
  • 删除check_for_connector_deletion 处理 ConnectorCredentialPair 的完全移除,包括撤销任何活动的索引或清理任务 backend/onyx/background/celery/tasks/beat_schedule.py:100-110
代码实体关联

标题:连接器生命周期任务映射

graph LR
    subgraph "Beat 调度模板"
        T1["check-for-indexing"]
        T2["check-for-pruning"]
        T3["check-for-connector-deletion"]
    end

    subgraph "Celery 任务"
        Task1["OnyxCeleryTask.CHECK_FOR_INDEXING"]
        Task2["OnyxCeleryTask.CHECK_FOR_PRUNING"]
        Task3["OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION"]
    end

    subgraph "Redis 协调"
        Lock1["OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK"]
        Lock2["OnyxRedisLocks.PRIMARY_WORKER"]
    end

    T1 --> Task1
    T2 --> Task2
    T3 --> Task3
    Task1 -.-> Lock1
    Task3 -.-> Lock2

来源: backend/onyx/background/celery/tasks/beat_schedule.py:66-110, backend/onyx/configs/constants.py:130-133

详细信息请参阅 连接器生命周期操作