agentic_huge_data_base / wiki
页面 Cognee · 10.7 Distributed 模式·DeepWiki 中文全文译文

10.7 · Distributed 模式(Distributed Mode)

记忆管道与知识图谱构建 · 本章是 Cognee DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Cognee 章节10.7 状态全文译文 模块安装与启动、工作流与编排、配置治理、图谱与关系
源码线索
  • .devcontainer/devcontainer.json
  • .github/workflows/distributed_test.yml
  • .github/workflows/temporal_graph_tests.yml
  • cognee/infrastructure/utils/run_sync.py
  • cognee/modules/data/methods/get_authorized_existing_datasets.py
  • cognee/modules/data/methods/load_or_create_datasets.py
  • cognee/modules/pipelines/layers/pipeline_execution_mode.py
  • cognee/modules/pipelines/methods/__init__.py
  • cognee/modules/pipelines/operations/run_tasks_distributed.py
  • cognee/modules/users/permissions/methods/give_permission_on_dataset.py
模块标签
  • 安装与启动
  • 工作流与编排
  • 配置治理
  • 图谱与关系
  • 系统架构

中文译文

Distributed 模式(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/topoteretes/cognee/10.7-distributed-mode
翻译时间:2026-05-27T08:45:18.135Z
翻译模型:deepseek-chat
原文字符数:11130
项目:Cognee (cognee)

---

分布式模式

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • .devcontainer/devcontainer.json
  • .github/workflows/distributed_test.yml
  • .github/workflows/temporal_graph_tests.yml
  • cognee/infrastructure/utils/run_sync.py
  • cognee/modules/data/methods/get_authorized_existing_datasets.py
  • cognee/modules/data/methods/load_or_create_datasets.py
  • cognee/modules/pipelines/layers/pipeline_execution_mode.py
  • cognee/modules/pipelines/methods/__init__.py
  • cognee/modules/pipelines/operations/run_tasks_distributed.py
  • cognee/modules/users/permissions/methods/give_permission_on_dataset.py
  • cognee/tests/e2e/dataset_queue/test_queue_serialization_e2e.py
  • cognee/tests/test_temporal_graph.py
  • deployment/helm/Dockerfile
  • deployment/setup_ubuntu_instance.sh
  • distributed/Dockerfile
  • distributed/deploy/README.md
  • distributed/deploy/daytona.yaml
  • distributed/deploy/daytona_sandbox.py
  • distributed/deploy/devcontainer.json
  • distributed/deploy/fly-deploy.sh
  • distributed/deploy/fly.toml
  • distributed/deploy/modal-deploy.sh
  • distributed/deploy/modal_app.py
  • distributed/deploy/railway-template.json
  • distributed/deploy/railway.toml
  • distributed/deploy/render.yaml
  • distributed/entrypoint.py
  • distributed/tasks/queued_add_data_points.py
  • distributed/tasks/queued_add_edges.py
  • distributed/tasks/queued_add_nodes.py
  • distributed/workers/data_point_saving_worker.py
  • distributed/workers/graph_saving_worker.py
  • tools/check-lockfile.py

Cognee 的分布式模式通过利用 Modal 实现无服务器执行,从而支持高性能、并行化的数据处理。该架构使 Cognee 能够跨多个容器扩展知识图谱构建和嵌入向量任务,突破了单机处理大数据集的限制。

架构总览

分布式执行模型将管线任务(提取、片段切分和大语言模型操作)的重负载处理与最终的数据库持久化分离。这是通过使用 Modal 的分布式队列,采用生产者-消费者模式实现的。

核心组件
  1. run_tasks_distributed:分布式管线执行的入口点。它负责将数据项映射到远程 Modal 函数 cognee/modules/pipelines/operations/run_tasks_distributed.py:84-92
  2. run_tasks_on_modal:一个 Modal 装饰的函数,在远程容器内执行 run_tasks_data_item cognee/modules/pipelines/operations/run_tasks_distributed.py:40-57
  3. 分布式队列:Modal 支持的队列(add_nodes_and_edges_queueadd_data_points_queue),用于将处理后的图谱元素和向量数据从工作节点传输到专门的保存工作节点 distributed/entrypoint.py:10-12
  4. 保存工作节点:专门的消费者,负责批量处理并将数据持久化到图谱数据库和向量数据库 distributed/workers/graph_saving_worker.py:51-53distributed/workers/data_point_saving_worker.py:53-55
分布式数据流

下图展示了数据集如何从本地入库流转到分布式处理,最终到达数据库持久化。

图表:分布式管线执行流程

graph TD
    subgraph "本地环境"
        ["cognee.cognify()"] --> ["run_tasks_distributed"]
        ["run_tasks_distributed"] --> ["Modal_Client"]
    end

    subgraph "Modal_Cloud_Parallel_Workers"
        ["Modal_Client"] -- "map.aio()" --> D1["run_tasks_on_modal_Worker_1"]
        ["Modal_Client"] -- "map.aio()" --> D2["run_tasks_on_modal_Worker_2"]
        ["Modal_Client"] -- "map.aio()" --> DN["run_tasks_on_modal_Worker_N"]

        D1 -- "节点/边" --> Q1["add_nodes_and_edges_queue"]
        D1 -- "向量" --> Q2["add_data_points_queue"]
    end

    subgraph "持久化工作节点"
        Q1 --> GSW["graph_saving_worker"]
        Q2 --> DSW["data_point_saving_worker"]

        GSW --> GDB[("图谱数据库_Neo4j_Kuzu")]
        DSW --> VDB[("向量数据库_PGVector_LanceDB")]
    end

来源:cognee/modules/pipelines/operations/run_tasks_distributed.py:133-137distributed/entrypoint.py:41-48distributed/workers/graph_saving_worker.py:69-71

任务编排

run_tasks_distributed

该函数启动分布式流程。它会解析数据目录,通过 log_pipeline_run_start 记录管线启动,并使用 run_tasks_on_modal.map.aio 将工作分发到各个数据项 cognee/modules/pipelines/operations/run_tasks_distributed.py:101-134。它会使用 PipelineRunStartedPipelineRunCompletedPipelineRunErrored 模型生成状态更新 cognee/modules/pipelines/operations/run_tasks_distributed.py:105-175

run_tasks_on_modal

这是在 Modal 上运行的执行包装器。其配置如下:

  • 重试次数:3 次 cognee/modules/pipelines/operations/run_tasks_distributed.py:41
  • 最大容器数:最多 50 个并发容器 cognee/modules/pipelines/operations/run_tasks_distributed.py:44
  • 超时时间:24 小时(86400 秒)cognee/modules/pipelines/operations/run_tasks_distributed.py:43

来源:cognee/modules/pipelines/operations/run_tasks_distributed.py:40-57

工作节点架构

Cognee 使用专用工作节点来处理数据持久化,以应对数据库特定的约束,如死锁和连接开销。

图谱保存工作节点(graph_saving_worker

该工作节点从 add_nodes_and_edges_queue 消费数据。它会将传入的数据批量处理(默认 BATCH_SIZE = 25),然后调用 graph_engine.add_nodesgraph_engine.add_edges distributed/workers/graph_saving_worker.py:51-70

  • 并发性:限制为 max_containers=1,以防止在 Neo4j 或 Kuzu 等图谱数据库中出现写入冲突和死锁 distributed/workers/graph_saving_worker.py:48
  • 死锁处理:针对 GraphDatabaseDeadlockError 实现了带有指数退避的重试机制 distributed/workers/graph_saving_worker.py:22-38
数据点保存工作节点(data_point_saving_worker

该工作节点通过 add_data_points_queue 处理向量数据和元数据的持久化。

  • 并发性:可以扩展到最多 max_containers=10 distributed/workers/data_point_saving_worker.py:50
  • 批量处理:将请求收集到 batched_points 字典中,按集合名称分组,以优化 vector_engine.create_data_points 的调用 distributed/workers/data_point_saving_worker.py:71-106

来源:distributed/workers/graph_saving_worker.py:44-55distributed/workers/data_point_saving_worker.py:46-57

代码实体映射

下图将高层分布式概念与代码库中的具体类和函数联系起来。

图表:分布式模式代码实体

classDiagram
    class ModalApp {
        <<distributed/app.py>>
        +app
        +image
    }
    class DistributedOperations {
        <<cognee/modules/pipelines/operations/run_tasks_distributed.py>>
        +run_tasks_distributed()
        +run_tasks_on_modal()
    }
    class SavingWorkers {
        <<distributed/workers/>>
        +graph_saving_worker()
        +data_point_saving_worker()
    }
    class Queues {
        <<distributed/queues.py>>
        +add_nodes_and_edges_queue
        +add_data_points_queue
    }
    class PipelineExecution {
        <<cognee/modules/pipelines/layers/pipeline_execution_mode.py>>
        +run_pipeline_blocking()
        +run_pipeline_as_background_process()
    }

    DistributedOperations ..> ModalApp : 使用
    DistributedOperations ..> Queues : 推送到
    Queues ..> SavingWorkers : 被消费
    PipelineExecution ..> DistributedOperations : 编排

来源:cognee/modules/pipelines/operations/run_tasks_distributed.py:33-47distributed/entrypoint.py:9-12cognee/modules/pipelines/layers/pipeline_execution_mode.py:48-51

部署与配置

环境变量

要启用分布式模式,需要设置以下环境变量:

  • COGNEE_DISTRIBUTED:设置为 True distributed/entrypoint.py:18
  • MODAL_SECRET_NAME:包含数据库凭据的 Modal 密钥名称(默认为 distributed_cogneecognee/modules/pipelines/operations/run_tasks_distributed.py:38
  • RUN_MODE:在分布式 Dockerfile 中设置为 modal distributed/Dockerfile:5
Daytona 集成

Cognee 可以使用 Daytona 部署在隔离的云沙箱中。distributed/deploy/daytona_sandbox.py 中的 deploy_cognee 函数会自动创建基于 Debian 的沙箱,通过 pip install 'cognee[api]' 安装 Cognee,并启动 Uvicorn 服务器 distributed/deploy/daytona_sandbox.py:57-92

多云部署选项

Cognee 为多个平台提供了一键部署配置:

平台部署机制配置文件
Modal无服务器 ASGI 应用distributed/deploy/modal_app.py distributed/deploy/modal_app.py:20-22
Railway原生 Postgres + APIdistributed/deploy/railway.toml distributed/deploy/README.md:43-46
Fly.io带卷的边缘虚拟机distributed/deploy/fly.toml distributed/deploy/README.md:68-71
Render蓝图(API + 数据库)distributed/deploy/render.yaml distributed/deploy/README.md:90-95

来源:distributed/deploy/daytona_sandbox.py:57-118distributed/deploy/modal_app.py:1-18distributed/deploy/README.md:1-15