agentic_huge_data_base / wiki
页面 Onyx · 7.4 Vespa Sync 与维护任务·DeepWiki 中文全文译文

7.4 · Vespa Sync 与维护任务(Vespa Sync and Maintenance Tasks)

企业连接器与统一搜索 · 本章是 Onyx DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Onyx 章节7.4 状态全文译文 模块测试、发布与运维、文档对象与元数据、系统架构、工作流与编排
源码线索
  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/ee/onyx/background/celery_utils.py
  • backend/ee/onyx/background/task_name_builders.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
模块标签
  • 测试、发布与运维
  • 文档对象与元数据
  • 系统架构
  • 工作流与编排
  • 接口与服务契约

中文译文

Vespa Sync 与维护任务(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/onyx-dot-app/onyx/7.4-vespa-sync-and-maintenance-tasks
翻译时间:2026-05-27T08:45:09.060Z
翻译模型:deepseek-chat
原文字符数:20698
项目:Onyx (onyx)

---

Vespa 同步与维护任务

相关源文件

以下文件为本维基页面的生成提供了上下文:

  • backend/ee/onyx/background/celery/apps/primary.py
  • backend/ee/onyx/background/celery/tasks/beat_schedule.py
  • backend/ee/onyx/background/celery_utils.py
  • backend/ee/onyx/background/task_name_builders.py
  • backend/onyx/background/celery/apps/app_base.py
  • backend/onyx/background/celery/apps/beat.py
  • backend/onyx/background/celery/apps/heavy.py
  • backend/onyx/background/celery/apps/light.py
  • backend/onyx/background/celery/apps/monitoring.py
  • backend/onyx/background/celery/apps/primary.py
  • backend/onyx/background/celery/tasks/beat_schedule.py
  • backend/onyx/background/celery/tasks/monitoring/tasks.py
  • backend/onyx/background/celery/tasks/shared/tasks.py
  • backend/onyx/background/celery/tasks/vespa/tasks.py
  • backend/onyx/background/task_utils.py
  • backend/onyx/chat/chat_processing_checker.py
  • backend/onyx/chat/stop_signal_checker.py
  • backend/onyx/configs/app_configs.py
  • backend/onyx/configs/constants.py
  • backend/onyx/connectors/google_site/connector.py
  • backend/onyx/document_index/factory.py
  • backend/onyx/federated_connectors/oauth_utils.py
  • backend/onyx/key_value_store/factory.py
  • backend/onyx/key_value_store/store.py
  • backend/onyx/redis/redis_pool.py
  • backend/onyx/server/features/release_notes/utils.py
  • backend/onyx/utils/platform.py
  • backend/onyx/utils/telemetry.py
  • backend/scripts/dev_run_background_jobs.py
  • backend/supervisord.conf
  • backend/tests/external_dependency_unit/redis/test_tenant_redis.py

目的与范围

本文档介绍了保持 Vespa 文档索引与 PostgreSQL 同步的后台维护任务,以及文档生命周期管理操作。这些任务确保系统各组件间的数据一致性、访问控制与清理工作。

关于初始文档索引和内容处理的信息,请参见文档索引管线

系统处理五种主要维护操作:

操作目的触发方式
文档元数据同步更新 Vespa 中的访问控制列表、文档集、权重和隐藏状态周期性检查(20秒)backend/onyx/background/celery/tasks/beat_schedule.py:114
文档集同步将文档集成员关系变更传播到所有受影响的文档文档集被修改时 backend/onyx/background/celery/tasks/vespa/tasks.py:120
用户组同步将用户组成员关系变更传播到所有受影响的文档(仅企业版)用户组被修改时 backend/onyx/background/celery/tasks/vespa/tasks.py:156
连接器清理从 Vespa 中移除源系统中已不存在的文档基于 prune_freq backend/onyx/background/celery/tasks/beat_schedule.py:124
连接器删除完全移除已删除连接器的所有文档和元数据连接器标记为 DELETING 时 backend/onyx/background/celery/tasks/beat_schedule.py:101

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:82-158backend/onyx/background/celery/tasks/beat_schedule.py:109-117backend/onyx/configs/constants.py:130

架构总览

所有维护任务都采用协调器-工作节点模式,并基于 Redis 的围栏机制防止重复工作并跟踪进度。Celery Beat 中的 DynamicTenantScheduler 负责调度作为协调器的周期性检查任务。

图 1:完整维护任务生态系统
graph TB
    subgraph "Celery Beat 调度器"
        Beat["DynamicTenantScheduler"]
        VespaCheck["check_for_vespa_sync_task<br/>(每 20 秒)"]
        PruningCheck["check_for_pruning<br/>(每 20 秒)"]
        DeletionCheck["check_for_connector_deletion<br/>(每 20 秒)"]
    end

    subgraph "Vespa 同步操作"
        VespaSyncWorker["vespa_metadata_sync_task<br/>VESPA_METADATA_SYNC 队列"]
        DocSetSync["文档集同步<br/>RedisDocumentSet"]
        UserGroupSync["用户组同步<br/>RedisUserGroup"]
        DocSync["文档同步<br/>DOCUMENT_SYNC_FENCE_KEY"]
    end

    subgraph "清理操作"
        PruneGen["connector_pruning_generator_task<br/>CONNECTOR_PRUNING 队列"]
        PruneWorker["document_by_cc_pair_cleanup_task"]
        RedisConnectorPrune["RedisConnectorPrune"]
    end

    subgraph "删除操作"
        DeleteGen["check_for_connector_deletion"]
        DeleteWorker["document_by_cc_pair_cleanup_task"]
        RedisConnectorDelete["RedisConnectorDelete"]
    end

    subgraph "数据存储"
        Postgres["PostgreSQL<br/>Document, DocumentSet,<br/>UserGroup, CCPair"]
        Vespa["Vespa 索引<br/>带元数据的片段"]
        Redis["Redis<br/>TenantRedis 前缀"]
    end

    Beat --> VespaCheck
    Beat --> PruningCheck
    Beat --> DeletionCheck

    VespaCheck --> DocSetSync
    VespaCheck --> UserGroupSync
    VespaCheck --> DocSync

    DocSetSync --> VespaSyncWorker
    UserGroupSync --> VespaSyncWorker
    DocSync --> VespaSyncWorker

    PruningCheck --> PruneGen
    PruneGen --> PruneWorker
    PruneGen --> RedisConnectorPrune

    DeletionCheck --> DeleteGen
    DeleteGen --> DeleteWorker
    DeleteGen --> RedisConnectorDelete

    VespaSyncWorker --> Vespa
    PruneWorker --> Vespa
    DeleteWorker --> Vespa

    RedisConnectorPrune --> Redis
    RedisConnectorDelete --> Redis
    DocSetSync --> Redis
    UserGroupSync --> Redis

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:75-207backend/onyx/background/celery/tasks/beat_schedule.py:37-179backend/onyx/redis/redis_pool.py:65-175

Vespa 同步检查任务

check_for_vespa_sync_task 是主要的协调器任务,每 20 秒通过 Celery Beat 执行一次。它负责编排所有 Vespa 同步操作。

任务流程
sequenceDiagram
    participant Beat as Celery Beat
    participant CheckTask as check_for_vespa_sync_task
    participant Redis as Redis(TenantRedis)
    participant DB as PostgreSQL
    participant Generator as 同步生成器
    participant Worker as vespa_metadata_sync_task

    Beat->>CheckTask: 执行(每 20 秒)
    CheckTask->>Redis: 获取 CHECK_VESPA_SYNC_BEAT_LOCK

    Note over CheckTask: 阶段 1:启动
    CheckTask->>DB: 查询过期文档
    CheckTask->>Generator: try_generate_stale_document_sync_tasks
    Generator->>Redis: 创建围栏和任务集
    Generator->>Worker: 分发 N 个任务

    CheckTask->>DB: 查询文档集
    loop 对每个文档集
        CheckTask->>Generator: try_generate_document_set_sync_tasks
        Generator->>Redis: 创建围栏和任务集
        Generator->>Worker: 分发 N 个任务
    end

    CheckTask->>DB: 查询用户组(仅企业版)
    loop 对每个用户组
        CheckTask->>Generator: try_generate_user_group_sync_tasks
        Generator->>Redis: 创建围栏和任务集
        Generator->>Worker: 分发 N 个任务
    end

    Note over CheckTask: 阶段 3:完成
    CheckTask->>Redis: 读取 ACTIVE_FENCES
    loop 对每个活跃围栏
        CheckTask->>Redis: 检查任务集剩余计数
        alt count == 0
            CheckTask->>DB: 标记实体为已同步
            CheckTask->>Redis: 移除围栏
        else count > 0
            CheckTask->>Redis: 更新同步进度
        end
    end

    CheckTask->>Redis: 释放锁

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:82-207backend/onyx/background/celery/tasks/beat_schedule.py:112-120

任务执行详情

该任务遵循三阶段流程:

阶段描述关键操作
启动为需要更新的实体生成同步任务- try_generate_stale_document_sync_tasks backend/onyx/background/celery/tasks/vespa/tasks.py:107<br/>- try_generate_document_set_sync_tasks backend/onyx/background/celery/tasks/vespa/tasks.py:126<br/>- try_generate_user_group_sync_tasks backend/onyx/background/celery/tasks/vespa/tasks.py:155
校验*(当前待办)*未来:校验围栏一致性
完成监控活跃围栏并完成同步操作- 检查 OnyxRedisConstants.ACTIVE_FENCES backend/onyx/background/celery/tasks/vespa/tasks.py:163<br/>- 监控任务集中的剩余任务

该任务使用分布式锁(OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK),超时时间为 120 秒,以防止重叠执行:

lock_beat: RedisLock = r.lock(
    OnyxRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK,
    timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

if not lock_beat.acquire(blocking=False):
    return None

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:95-102backend/onyx/configs/constants.py:130

文档元数据同步任务

vespa_metadata_sync_task 是执行单个文档元数据更新到 Vespa 的工作节点任务。

代码实体映射
graph LR
    Task["vespa_metadata_sync_task<br/>(OnyxCeleryTask.VESPA_METADATA_SYNC_TASK)"]
    Queue["OnyxCeleryQueues.VESPA_METADATA_SYNC"]
    DocIndex["RetryDocumentIndex"]
    VespaFields["VespaDocumentFields"]

    subgraph "数据库操作"
        GetDoc["get_document(document_id)"]
        FetchSets["fetch_document_sets_for_document"]
        GetAccess["get_access_for_document"]
        MarkSynced["mark_document_as_synced"]
    end

    subgraph "Vespa 操作"
        UpdateSingle["retry_index.update_single()"]
    end

    Task -->|"排队到"| Queue
    Task -->|"使用"| DocIndex
    Task -->|"构建"| VespaFields

    Task --> GetDoc
    Task --> FetchSets
    Task --> GetAccess
    Task --> UpdateSingle
    Task --> MarkSynced

    VespaFields -->|"传递给"| UpdateSingle

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:453-577backend/onyx/document_index/factory.py:1-50backend/onyx/background/celery/tasks/shared/tasks.py:155-160

同步操作流程

同步任务更新 Vespa 中的元数据字段,主要是访问控制和文档集成员关系:

graph TD
    Start["vespa_metadata_sync_task<br/>输入:document_id"]
    LoadDoc["从 PostgreSQL 加载文档"]
    CheckExists{"文档<br/>是否存在?"}

    FetchSets["fetch_document_sets_for_document<br/>返回:set[str]"]
    GetAccess["get_access_for_document<br/>返回:DocumentAccess"]

    BuildFields["VespaDocumentFields:<br/>- document_sets: set[str]<br/>- access: DocumentAccess<br/>- boost: int<br/>- hidden: bool"]

    UpdateVespa["retry_index.update_single<br/>tenant_id, chunk_count, fields"]
    MarkDB["mark_document_as_synced"]

    RemoveTask["从 Redis 任务集中移除 task_id"]

    Start --> LoadDoc
    LoadDoc --> CheckExists
    CheckExists -->|"否"| RemoveTask
    CheckExists -->|"是"| FetchSets
    FetchSets --> GetAccess
    GetAccess --> BuildFields
    BuildFields --> UpdateVespa
    UpdateVespa --> MarkDB
    MarkDB --> RemoveTask

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:460-577backend/onyx/background/celery/tasks/shared/tasks.py:142-173

重试逻辑与错误处理

该任务对临时性的 Vespa 或数据库故障实现了重试逻辑:

错误类型行为最大重试次数
SoftTimeLimitExceeded记录日志并优雅退出不适用 backend/onyx/background/celery/tasks/vespa/tasks.py:535
RetryError提取内部异常并重试3 backend/onyx/background/celery/tasks/vespa/tasks.py:560
其他异常使用指数退避重试3 backend/onyx/background/celery/tasks/vespa/tasks.py:569

指数退避公式:countdown = 2 ** (self.request.retries + 4)(从 16 秒开始)。

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:535-576

文档集同步

文档集是为用户在搜索/聊天中选择而分组的文档集合。当文档集成员关系发生变化时(标记为 is_up_to_date=False),所有受影响的文档都必须在 Vespa 中更新。

文档集同步协调器
graph TB
    CheckTask["check_for_vespa_sync_task"]
    FetchSets["fetch_document_sets<br/>(include_outdated=True)"]

    subgraph "每个文档集"
        TryGen["try_generate_document_set_sync_tasks"]
        CheckFence{"RedisDocumentSet<br/>.fenced?"}
        CheckUpToDate{"document_set<br/>.is_up_to_date?"}

        ClearTaskset["r.delete(rds.taskset_key)"]
        GenerateTasks["rds.generate_tasks"]
        InsertRecord["insert_sync_record<br/>(SyncType.DOCUMENT_SET)"]
        SetFence["rds.set_fence(tasks_generated)"]
    end

    CheckTask --> FetchSets
    FetchSets -->|"对每个 document_set"| TryGen

    TryGen --> CheckFence
    CheckFence -->|"是"| End["返回 None"]
    CheckFence -->|"否"| CheckUpToDate

    CheckUpToDate -->|"是"| Cleanup["cleanup_sync_records"]
    CheckUpToDate -->|"否"| ClearTaskset

    ClearTaskset --> GenerateTasks
    GenerateTasks --> InsertRecord
    InsertRecord --> SetFence

    Cleanup --> End
    SetFence --> End

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:210-284backend/onyx/redis/redis_document_set.py:45-47

RedisDocumentSet 类

RedisDocumentSet 类管理文档集同步的 Redis 协调。它使用 TenantRedis 确保键以 tenant_id 为前缀:

属性/方法用途Redis 键模式
fence_key跟踪同步操作状态documentset_fence_{id}
taskset_key跟踪单个任务 IDdocumentset_taskset_{id}
fenced检查同步是否进行中Redis EXISTS 检查
generate_tasks()为所有文档创建同步任务查询数据库并通过 send_task 分发
reset()完成后清理删除所有 Redis 键

来源: backend/onyx/redis/redis_document_set.py:45-47backend/onyx/redis/redis_pool.py:65-175

用户组同步

用户组控制企业版中的文档访问权限。当用户组成员关系发生变化时,所有受影响的文档都必须在 Vespa 中更新,以反映新的访问控制列表。

用户组同步流程

用户组同步遵循与文档集同步相同的模式,但包含企业版特定的逻辑,通过 global_version.is_ee_version() 启用:

graph TB
    CheckVersion{"global_version<br/>.is_ee_version()?"}
    FetchFunc["fetch_versioned_implementation<br/>('onyx.db.user_group', 'fetch_user_groups')"]
    FetchGroups["fetch_user_groups(only_up_to_date=False)"]

    subgraph "每个用户组"
        TryGen["try_generate_user_group_sync_tasks"]
        RUG["RedisUserGroup(tenant_id, usergroup_id)"]
        CheckFenced{"rug.fenced?"}
        CheckUpToDate{"usergroup<br/>.is_up_to_date?"}

        ClearTaskset["r.delete(rug.taskset_key)"]
        GenerateTasks["rug.generate_tasks()"]
        InsertRecord["insert_sync_record<br/>(SyncType.USER_GROUP)"]
        SetFence["rug.set_fence(tasks_generated)"]
    end

    CheckVersion -->|"是"| FetchFunc
    CheckVersion -->|"否"| End["跳过"]

    FetchFunc --> FetchGroups
    FetchGroups -->|"对每个 usergroup"| TryGen

    TryGen --> RUG
    RUG --> CheckFenced
    CheckFenced -->|"是"| End
    CheckFenced -->|"否"| CheckUpToDate

    CheckUpToDate -->|"是"| Cleanup["cleanup_sync_records"]
    CheckUpToDate -->|"否"| ClearTaskset

    ClearTaskset --> GenerateTasks
    GenerateTasks --> InsertRecord
    InsertRecord --> SetFence

    Cleanup --> End
    SetFence --> End

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:133-158backend/onyx/background/celery/tasks/vespa/tasks.py:287-361

围栏与协调

Vespa 同步系统使用基于 Redis 的围栏机制来协调分布式操作并防止重复工作。TenantRedis 包装器会自动为这些键添加 tenant_id 前缀,以确保多租户环境中的隔离性。

围栏生命周期
stateDiagram-v2
    [*] --> NoFence: 初始状态

    NoFence --> FenceCreating: 生成器任务启动
    FenceCreating --> FenceActive: set_fence(payload)

    FenceActive --> TasksDispatching: 创建任务集
    TasksDispatching --> TasksActive: 工作节点执行中

    TasksActive --> TasksActive: 工作节点从任务集中移除(on_task_postrun)
    TasksActive --> Monitoring: Beat 检查进度

    Monitoring --> TasksActive: count > 0
    Monitoring --> Finalizing: count == 0

    Finalizing --> MarkSynced: 标记实体为已同步
    MarkSynced --> CleanupFence: 重置围栏和任务集
    CleanupFence --> [*]

    FenceActive --> CleanupFence: 错误/超时

来源: backend/onyx/background/celery/tasks/vespa/tasks.py:163-207backend/onyx/background/celery/apps/app_base.py:130-181

Redis 键结构

围栏系统为每个同步操作使用多个 Redis 键:

键类型模式用途值类型
围栏{PREFIX}_fence_{id}主同步状态整数(任务计数)
任务集{PREFIX}_taskset_{id}跟踪活跃任务 IDRedis 集合
活跃围栏ACTIVE_FENCES全局查找表Redis 围栏键集合 backend/onyx/background/celery/tasks/vespa/tasks.py:163
CHECK_VESPA_SYNC_BEAT_LOCK防止并发 BeatRedisLock backend/onyx/configs/constants.py:35

来源: backend/onyx/redis/redis_document_set.py:21-30backend/onyx/redis/redis_pool.py:65-175backend/onyx/configs/constants.py:35

配置与调优

关键配置值
常量用途
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT120 秒检查任务的锁超时时间 backend/onyx/configs/constants.py:130
VESPA_SYNC_MAX_TASKS8192每批同步的最大任务数 backend/onyx/configs/app_configs.py:152
LIGHT_SOFT_TIME_LIMIT105 秒同步任务的软超时时间 backend/onyx/background/celery/tasks/shared/tasks.py:38
LIGHT_TIME_LIMIT120 秒同步任务的硬超时时间 backend/onyx/background/celery/tasks/shared/tasks.py:39
BEAT_EXPIRES_DEFAULT900 秒任务未出队时的过期时间 backend/onyx/background/celery/tasks/beat_schedule.py:28

来源: backend/onyx/configs/constants.py:130backend/onyx/background/celery/tasks/shared/tasks.py:38-39backend/onyx/background/celery/tasks/beat_schedule.py:27-28

工作节点分配

vespa_metadata_sync 任务由 celery_worker_light 池处理 backend/supervisord.conf:49。工作节点使用 TenantAwareTask 根据任务参数自动设置 CURRENT_TENANT_ID_CONTEXTVAR backend/onyx/background/celery/apps/app_base.py:80-98

来源: backend/onyx/background/celery/apps/app_base.py:82-101backend/supervisord.conf:45-55