agentic_huge_data_base / wiki
页面 Cognee · 3.2 管线任务与执行·DeepWiki 中文全文译文

3.2 · 管线任务与执行(Pipeline Tasks and Execution)

记忆管道与知识图谱构建 · 本章是 Cognee DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Cognee 章节3.2 状态全文译文 模块工作流与编排、图谱与关系、测试、发布与运维、入库与解析
源码线索
  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/cognify.py
  • cognee/infrastructure/engine/models/DataPoint.py
  • cognee/infrastructure/utils/run_sync.py
  • cognee/modules/chunking/models/DocumentChunk.py
  • cognee/modules/data/methods/get_authorized_existing_datasets.py
  • cognee/modules/data/methods/load_or_create_datasets.py
  • cognee/modules/data/models/Data.py
  • cognee/modules/engine/models/Entity.py
  • cognee/modules/engine/models/EntityType.py
模块标签
  • 工作流与编排
  • 图谱与关系
  • 测试、发布与运维
  • 入库与解析
  • 认证、权限与安全

中文译文

管线任务与执行(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/topoteretes/cognee/3.2-pipeline-tasks-and-execution
翻译时间:2026-05-27T08:45:23.287Z
翻译模型:deepseek-chat
原文字符数:15350
项目:Cognee (cognee)

---

管线任务与执行

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/cognify.py
  • cognee/infrastructure/engine/models/DataPoint.py
  • cognee/infrastructure/utils/run_sync.py
  • cognee/modules/chunking/models/DocumentChunk.py
  • cognee/modules/data/methods/get_authorized_existing_datasets.py
  • cognee/modules/data/methods/load_or_create_datasets.py
  • cognee/modules/data/models/Data.py
  • cognee/modules/engine/models/Entity.py
  • cognee/modules/engine/models/EntityType.py
  • cognee/modules/pipelines/layers/pipeline_execution_mode.py
  • cognee/modules/pipelines/methods/__init__.py
  • cognee/modules/pipelines/models/PipelineContext.py
  • cognee/modules/pipelines/operations/pipeline.py
  • cognee/modules/pipelines/operations/run_pipeline.py
  • cognee/modules/pipelines/operations/run_tasks.py
  • cognee/modules/pipelines/operations/run_tasks_base.py
  • cognee/modules/pipelines/operations/run_tasks_distributed.py
  • cognee/modules/pipelines/operations/run_tasks_with_telemetry.py
  • cognee/modules/pipelines/tasks/task.py
  • cognee/modules/users/permissions/methods/give_permission_on_dataset.py
  • cognee/modules/visualization/cognee_network_visualization.py
  • cognee/pipelines/__init__.py
  • cognee/pipelines/types.py
  • cognee/shared/CodeGraphEntities.py
  • cognee/tasks/ingestion/ingest_data.py
  • cognee/tasks/ingestion/save_data_item_to_storage.py
  • cognee/tasks/summarization/models.py
  • cognee/tests/e2e/dataset_queue/test_queue_serialization_e2e.py
  • cognee/tests/unit/modules/visualization/visualization_test.py
  • cognee/tests/unit/pipelines/test_simplified_pipelines.py
  • distributed/entrypoint.py
  • distributed/tasks/queued_add_data_points.py
  • distributed/tasks/queued_add_edges.py
  • distributed/tasks/queued_add_nodes.py
  • distributed/workers/data_point_saving_worker.py
  • distributed/workers/graph_saving_worker.py
  • poetry.lock
  • pyproject.toml
  • uv.lock

本文档解释了驱动 Cognee 数据处理管线的任务执行系统。任务是工作流中的基本单元,负责在各个阶段(分类、片段切分、提取、图谱构建)对数据进行转换。本文涵盖任务的定义方式、如何组装成管线、如何通过批处理和错误处理执行,以及如何进行监控。

有关具体处理阶段(文档分类、实体提取等)的信息,请参见知识图谱生成(3.3)。有关任务执行前的数据入库详情,请参见数据入库(3.1)

任务定义与结构

Cognee 中的任务是围绕处理函数的包装对象,用于定义管线中的离散操作。Task 类(定义在 cognee/modules/pipelines/tasks/task.py 中)封装了一个可调用函数及其配置,并提供了一个 execute() 方法,该方法以异步方式生成结果。

graph TB
    TaskClass["Task 类 (cognee/modules/pipelines/tasks/task.py)"]
    executable["executable: Callable<br/>(classify_documents,<br/>extract_chunks_from_documents,<br/>extract_graph_and_summarize 等)"]
    task_config["task_config: dict<br/>{'batch_size': int}"]
    args_kwargs["*args, **kwargs<br/>(传递给 executable)"]
    execute_method["execute(args, kwargs,<br/>next_task_batch_size)<br/>→ AsyncGenerator"]

    TaskClass --> executable
    TaskClass --> task_config
    TaskClass --> args_kwargs
    TaskClass --> execute_method

    executable --> classify_documents["classify_documents (cognee/tasks/documents/classify_documents.py)"]
    executable --> extract_chunks["extract_chunks_from_documents (cognee/tasks/documents/extract_chunks.py)"]
    executable --> extract_graph["extract_graph_and_summarize (cognee/tasks/graph/extract_graph_and_summarize.py)"]
    executable --> add_data_points["add_data_points (cognee/tasks/storage/add_data_points.py)"]

来源: cognee/modules/pipelines/tasks/task.py:1-25cognee/api/v1/cognify/cognify.py:13cognee/api/v1/cognify/cognify.py:22-33

任务组件

一个 Task 对象包含以下部分:

  • executable:实际的处理函数(例如 classify_documentsextract_graph_and_summarize)。cognee/modules/pipelines/tasks/task.py:10
  • task_config:配置字典,其中 batch_size 控制输出批处理。cognee/modules/pipelines/tasks/task.py:11
  • execute():异步生成器方法,以批次方式生成处理后的结果。cognee/modules/pipelines/tasks/task.py:16-25
  • accepts_ctx:在构造时确定的标志,指示 executable 是否接受 PipelineContext 参数。cognee/modules/pipelines/operations/run_tasks_base.py:148-149

cognify 管线中使用的示例任务函数:

任务函数用途文件引用
classify_documents识别文档类型和结构cognee/tasks/documents/__init__.py
extract_chunks_from_documents将内容拆分为语义片段cognee/tasks/documents/__init__.py
extract_graph_and_summarize提取实体、关系并创建摘要cognee/tasks/graph/extract_graph_and_summarize.py
add_data_points将嵌入向量和图数据存储到数据库中cognee/tasks/storage/add_data_points.py
extract_events_and_timestamps时间实体和事件提取cognee/tasks/temporal_graph/extract_events_and_entities.py

来源: cognee/api/v1/cognify/cognify.py:22-33cognee/modules/pipelines/operations/run_tasks_base.py:145-151

管线组装

管线通过创建按顺序执行的 Task 对象列表来组装。cognify 函数是主要的编排器,负责根据用户配置构建这些任务列表。

graph LR
    subgraph "Cognify 管线 (cognee/api/v1/cognify/cognify.py)"
        CognifyTask1["Task(classify_documents)"]
        CognifyTask2["Task(extract_chunks_from_documents)"]
        CognifyTask3["Task(extract_graph_and_summarize)"]
        CognifyTask4["Task(add_data_points)"]

        CognifyTask1 --> CognifyTask2
        CognifyTask2 --> CognifyTask3
        CognifyTask3 --> CognifyTask4
    end

    subgraph "时间管线 (cognee/api/v1/cognify/cognify.py)"
        TempTask1["Task(classify_documents)"]
        TempTask2["Task(extract_chunks_from_documents)"]
        TempTask3["Task(extract_events_and_timestamps)"]
        TempTask4["Task(extract_knowledge_graph_from_events)"]
        TempTask5["Task(add_data_points)"]

        TempTask1 --> TempTask2
        TempTask2 --> TempTask3
        TempTask3 --> TempTask4
        TempTask4 --> TempTask5
    end

来源: cognee/api/v1/cognify/cognify.py:248-307cognee/api/v1/cognify/cognify.py:310-347

管线执行流程

管线执行遵循层次化流程:run_pipeline()run_pipeline_per_dataset()run_tasks()run_tasks_data_item()run_tasks_base()handle_task()

graph TB
    run_pipeline["run_pipeline() (cognee/modules/pipelines/operations/pipeline.py)"]
    run_pipeline_per_dataset["run_pipeline_per_dataset()"]
    run_tasks["run_tasks() (cognee/modules/pipelines/operations/run_tasks.py)"]
    run_tasks_data_item["run_tasks_data_item()"]
    run_tasks_base["run_tasks_base() (cognee/modules/pipelines/operations/run_tasks_base.py)"]
    handle_task["handle_task()"]
    task_execute["task.execute()"]

    run_pipeline --> run_pipeline_per_dataset
    run_pipeline_per_dataset --> run_tasks
    run_tasks --> run_tasks_data_item
    run_tasks_data_item --> run_tasks_base
    run_tasks_base --> handle_task
    handle_task --> task_execute

来源: cognee/modules/pipelines/operations/pipeline.py:33-63cognee/modules/pipelines/operations/run_tasks.py:56-122cognee/modules/pipelines/operations/run_tasks_base.py:123-183

执行层次

第 1 层:run_pipeline() 顶层入口点。它负责校验任务、设置数据库环境,并解析用户已授权的数据集。cognee/modules/pipelines/operations/pipeline.py:33-49

第 2 层:run_pipeline_per_dataset() 针对特定 Dataset 执行管线。它会检查管线是否已经运行过(资格校验逻辑),然后调用 run_tasks()cognee/modules/pipelines/operations/pipeline.py:66-107

第 3 层:run_tasks() 编排数据集中所有数据项的执行。它会生成一个 pipeline_id,记录开始日志,并使用 asyncio.Semaphore 根据 data_per_batch 控制并发度。cognee/modules/pipelines/operations/run_tasks.py:74-117

第 4 层:run_tasks_base() 驱动顺序任务执行的递归引擎。它决定下一个任务的批次大小,并调用 handle_task()cognee/modules/pipelines/operations/run_tasks_base.py:182-197

第 5 层:handle_task() 为任务执行添加可观测性(OpenTelemetry 跨度)、遥测和来源标记。它执行 running_task,然后递归调用 run_tasks_base 处理管线中的剩余任务。cognee/modules/pipelines/operations/run_tasks_base.py:123-183

批处理与并行化

Cognee 使用两种类型的批处理来优化性能:

  1. 数据级并发(data_per_batch:在 run_tasks() 中,使用信号量限制同时处理的 Data 项数量(默认值为 20)。cognee/modules/pipelines/operations/run_tasks.py:97-117
  2. 任务级批处理(batch_size:在 Task 内部,来自上一阶段的多个结果会先被合并成一个批次,然后再传递给当前任务的 executable。这对于基于大语言模型(LLM)的提取至关重要,可以减少 API 开销。cognee/modules/pipelines/operations/run_tasks_base.py:166cognee/modules/pipelines/tasks/task.py:11

来源: cognee/modules/pipelines/operations/run_tasks.py:97-117cognee/modules/pipelines/operations/run_tasks_base.py:166

数据来源与血缘

管线生成的每个 DataPoint 都会被打上元数据标记,以追踪其来源。该逻辑在 _stamp_provenance() 中实现。cognee/modules/pipelines/operations/run_tasks_base.py:33-91

来源标记逻辑

当任务生成数据时,handle_task() 会调用 _stamp_provenance(),该方法会递归遍历 DataPoint 及其字段:

  • source_pipeline:管线的名称(例如 "cognify")。cognee/modules/pipelines/operations/run_tasks_base.py:51-52
  • source_task:生成数据的 executable 函数名称。cognee/modules/pipelines/operations/run_tasks_base.py:53-54
  • source_user:运行管线的用户的邮箱或 ID。cognee/modules/pipelines/operations/run_tasks_base.py:55-56
  • source_node_set:从输入数据传播而来,用于对相关图元素进行分组。cognee/modules/pipelines/operations/run_tasks_base.py:59-63
  • source_content_hash:追踪原始数据内容的哈希值。cognee/modules/pipelines/operations/run_tasks_base.py:65-70
来源可视化

cognee/modules/visualization/cognee_network_visualization.py 中的可视化系统使用这些字段生成确定性的颜色映射,使用户能够直观地区分哪些任务或用户创建了知识图谱的特定部分。cognee/modules/visualization/cognee_network_visualization.py:11-19cognee/modules/visualization/cognee_network_visualization.py:85-88

来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-91cognee/modules/visualization/cognee_network_visualization.py:85-96

错误处理

Cognee 在多个层级处理错误,以确保管线的健壮性:

  • 数据项级错误:在 run_tasks() 中,asyncio.gather 调用时设置了 return_exceptions=True。异常会被捕获、记录,并转换为针对该特定数据项的 PipelineRunErrored 对象。cognee/modules/pipelines/operations/run_tasks.py:119-141
  • 管线级错误:如果任何数据项失败,会抛出 PipelineRunFailedError 以指示整个管线运行失败。cognee/modules/pipelines/operations/run_tasks.py:146-150
  • 日志记录:所有开始、完成和错误信息都会通过 log_pipeline_run_startlog_pipeline_run_completelog_pipeline_run_error 持久化记录。cognee/modules/pipelines/operations/run_tasks.py:75cognee/modules/pipelines/operations/run_tasks.py:151-153cognee/modules/pipelines/operations/run_tasks.py:171-173

来源: cognee/modules/pipelines/operations/run_tasks.py:119-184

分布式执行

Cognee 支持使用 Modal 进行分布式执行。这是通过在 run_tasks 函数上添加 @override_run_tasks(run_tasks_distributed) 装饰器来实现的。cognee/modules/pipelines/operations/run_tasks.py:55

如果环境变量 COGNEE_DISTRIBUTED 设置为 True,执行流程会路由到 run_tasks_distributed(),该函数通过 graph_saving_workerdata_point_saving_worker 等工作进程处理任务的远程执行。cognee/modules/pipelines/operations/run_tasks.py:40-48

来源: cognee/modules/pipelines/operations/run_tasks.py:36-55cognee/modules/pipelines/operations/run_tasks_distributed.py