agentic_huge_data_base / wiki
页面 Cognee · 12.5 自定义管线 Tutorial·DeepWiki 中文全文译文

12.5 · 自定义管线 Tutorial(Custom Pipeline Tutorial)

记忆管道与知识图谱构建 · 本章是 Cognee DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Cognee 章节12.5 状态全文译文 模块入库与解析、图谱与关系、文档对象与元数据、工作流与编排
源码线索
  • cognee/modules/pipelines/models/PipelineContext.py
  • cognee/modules/pipelines/operations/run_pipeline.py
  • cognee/modules/pipelines/tasks/task.py
  • cognee/pipelines/__init__.py
  • cognee/pipelines/types.py
  • cognee/tests/unit/infrastructure/engine/test_identity_fields.py
  • cognee/tests/unit/pipelines/test_simplified_pipelines.py
  • examples/custom_pipelines/agentic_reasoning_procurement_example.py
  • examples/custom_pipelines/memify_coding_agent_rule_extraction_example.py
  • examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py
模块标签
  • 入库与解析
  • 图谱与关系
  • 文档对象与元数据
  • 工作流与编排
  • 界面与交互

中文译文

自定义管线 Tutorial(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/topoteretes/cognee/12.5-custom-pipeline-tutorial
翻译时间:2026-05-27T08:45:17.818Z
翻译模型:deepseek-chat
原文字符数:10968
项目:Cognee (cognee)

---

自定义管线教程

相关源文件

以下文件为本 Wiki 页面生成时使用的上下文:

  • cognee/modules/pipelines/models/PipelineContext.py
  • cognee/modules/pipelines/operations/run_pipeline.py
  • cognee/modules/pipelines/tasks/task.py
  • cognee/pipelines/__init__.py
  • cognee/pipelines/types.py
  • cognee/tests/unit/infrastructure/engine/test_identity_fields.py
  • cognee/tests/unit/pipelines/test_simplified_pipelines.py
  • examples/custom_pipelines/agentic_reasoning_procurement_example.py
  • examples/custom_pipelines/memify_coding_agent_rule_extraction_example.py
  • examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py
  • examples/demos/dynamic_multiple_weighted_edges_example.py
  • examples/demos/web_url_content_ingestion_example.py
  • examples/guides/consolidate_entity_descriptions_example.py
  • examples/guides/custom_data_models.py
  • examples/guides/custom_graph_model.py
  • examples/guides/custom_prompts.py
  • examples/guides/custom_tasks_and_pipelines.py
  • examples/guides/graph_visualization.py
  • examples/guides/importance_weight.py
  • examples/guides/improve_quickstart.py
  • examples/guides/ontology_quickstart.py

本教程演示如何在 Cognee 中通过组合 Task 对象并执行它们来创建自定义数据处理管线。自定义管线支持超越默认 addcognify 操作的领域特定处理工作流,例如提取特定组织层级或执行自定义实体解析。

关于内置管线任务和执行框架的信息,请参见管线任务与执行

理解任务抽象

Cognee 管线由包装 Python 函数的 Task 对象组成。每个任务按顺序处理数据,前一个任务的输出会传入下一个任务。

任务函数签名模式
flowchart LR
    INPUT["数据输入<br/>(来自前一个任务<br/>或管线数据)"]
    FUNCTION["任务函数<br/>async def task_fn(data, **kwargs)"]
    CONTEXT["PipelineContext<br/>user, dataset,<br/>pipeline_name, extras"]
    OUTPUT["处理后的数据<br/>(输入到下一个任务)"]

    INPUT --> FUNCTION
    CONTEXT --> FUNCTION
    FUNCTION --> OUTPUT

任务函数要求:

  • 必须是 async 函数、生成器或协程。cognee/modules/pipelines/tasks/task.py:173-178
  • 第一个参数接收来自前一个任务或初始管线数据的数据。
  • 可以接受 ctx 参数(一个 PipelineContext 实例),其中包含 userdataset_idpipeline_namecognee/modules/pipelines/models/PipelineContext.py:8-14
  • handle_task 中的编排逻辑会在传递 ctx 之前检查函数签名是否接受该参数。cognee/modules/pipelines/operations/run_tasks_base.py:145-149

来源: cognee/modules/pipelines/tasks/task.py:173-206cognee/modules/pipelines/operations/run_tasks_base.py:123-166cognee/modules/pipelines/models/PipelineContext.py:8-14

任务包装器配置

Task 类提供了批处理和参数传递的配置选项。现代 Cognee 用法还支持 @task 装饰器和 TaskSpec 用于延迟执行。cognee/modules/pipelines/tasks/task.py:34-62

参数类型用途示例
executable可调用对象要执行的异步函数extract_people
*args任意类型传递给函数的位置参数max_chunk_size=512
task_config字典配置项,如 batch_size{"batch_size": 100}
**kwargs任意类型额外的关键字参数graph_model=CustomModel
graph TB
    TASK["Task 包装器 (cognee.modules.pipelines.tasks.task.Task)"]
    FUNC["包装的函数<br/>例如 ingest_files"]
    ARGS["函数参数<br/>例如 include_subdirectories=True"]
    BATCH["task_config<br/>{batch_size: 100}"]
    KWARGS["额外参数<br/>例如 custom_prompt='...'"]

    TASK --> FUNC
    TASK --> ARGS
    TASK --> BATCH
    TASK --> KWARGS

    FUNC -.接收.-> ARGS
    FUNC -.接收.-> KWARGS
    BATCH -.控制.-> FUNC

来源: cognee/modules/pipelines/tasks/task.py:187-206cognee/modules/pipelines/operations/run_tasks_base.py:166-180

创建自定义任务函数

自定义任务函数根据其在管线中的角色遵循特定模式。

数据转换任务模式

转换数据的任务函数通常接收一个数据项列表或单个数据项,并返回转换后的数据。

来自自定义组织层级管线的示例: examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:60-94

def ingest_files(data: List[Any]) -> List[Company]:
    all_companies: List[Company] = []
    # 将原始数据转换为 Company DataPoint 的逻辑
    for data_item in data:
        # ... 处理 ...
        all_companies.append(Company(name=company["name"], ...))
    return all_companies
存储任务模式

写入数据库的任务通常接收处理后的 DataPoint 对象并将其存储到配置的数据库中。add_data_points 任务通常用于自定义管线的末尾以持久化结果。

cognee/tasks/storage/__init__.py:1-5

from cognee.tasks.storage import add_data_points

# 这个内置任务接收 DataPoint 并将其保存到向量和图存储中
Task(add_data_points)

来源: examples/guides/custom_tasks_and_pipelines.py:76-79cognee/tasks/storage/__init__.py:1-5

组装和运行自定义管线

管线执行与来源追踪

当管线运行时,Cognee 会自动为 DataPoint 对象打上来源信息,包括 source_pipelinesource_tasksource_user。这由任务执行期间的 _stamp_provenance 函数处理。cognee/modules/pipelines/operations/run_tasks_base.py:33-92

sequenceDiagram
    participant Client as "自定义代码"
    participant RTB as "run_tasks_base"
    participant HT as "handle_task"
    participant Tasks as "任务函数"
    participant SP as "_stamp_provenance"

    Client->>RTB: run_tasks_base(tasks, data, user, ctx)
    RTB->>HT: handle_task(running_task, args, leftover_tasks, ...)
    HT->>Tasks: await running_task.execute(args, kwargs)
    Tasks-->>HT: result_data
    HT->>SP: _stamp_provenance(result_data, pipe_name, task_name, ...)
    SP->>SP: 递归设置 source_pipeline、source_task、source_user
    HT->>RTB: yield result

来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-92cognee/modules/pipelines/operations/run_tasks_base.py:123-183

run_custom_pipeline 接口

用户可以使用高级 run_custom_pipeline API 运行自定义任务列表。

examples/guides/custom_tasks_and_pipelines.py:81-83

await cognee.run_custom_pipeline(
    tasks=[
        Task(extract_people),
        Task(add_data_points),
    ],
    data=build_lightweight_data_object(text_data),
    dataset="people_demo"
)
代码到实体的映射

此图将高级管线概念桥接到实现中使用的具体代码实体。

graph TD
    subgraph "自然语言/概念空间"
        P_CONCEPT["管线定义"]
        T_CONCEPT["处理任务"]
        D_CONCEPT["领域数据"]
        V_CONCEPT["图可视化"]
    end

    subgraph "代码实体空间 (cognee)"
        RTB_FUNC["run_tasks_base() 在 cognee/modules/pipelines/operations/run_tasks_base.py"]
        T_CLASS["Task 类在 cognee/modules/pipelines/tasks/task.py"]
        DP_CLASS["DataPoint 类在 cognee/infrastructure/engine/models/DataPoint.py"]
        VIS_FUNC["visualize_graph() 在 cognee/api/v1/visualize/visualize.py"]
    end

    P_CONCEPT --> RTB_FUNC
    T_CONCEPT --> T_CLASS
    D_CONCEPT --> DP_CLASS
    V_CONCEPT --> VIS_FUNC
    RTB_FUNC --> T_CLASS

来源: cognee/modules/pipelines/operations/run_tasks_base.py:182-194cognee/modules/pipelines/tasks/task.py:172-186cognee/infrastructure/engine/models/DataPoint.py:27-55cognee/api/v1/visualize/visualize.py:12-20

实现示例:组织层级

要实现领域特定的管线,请定义您的 DataPoint 模型和转换任务。在 metadata 中使用 identity_fields 可以实现确定性 ID 生成和自动去重。cognee/tests/unit/infrastructure/engine/test_identity_fields.py:11-20

examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:18-28

class Person(DataPoint):
    name: str
    metadata: dict = {"index_fields": ["name"], "identity_fields": ["name"]}

class Department(DataPoint):
    name: str
    employees: list[Person]
    metadata: dict = {"index_fields": ["name"], "identity_fields": ["name"]}
自定义管线的可视化

自定义管线通常会产生独特的图结构。您可以使用 visualize_graph 来可视化这些结构。可视化系统使用来源数据(source_tasksource_pipeline)为节点着色。

cognee/modules/visualization/cognee_network_visualization.py:85-88

task_color_map = _generate_provenance_colors([n.get("source_task") for n in nodes_list])
pipeline_color_map = _generate_provenance_colors([n.get("source_pipeline") for n in nodes_list])

来源: examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:18-94cognee/modules/visualization/cognee_network_visualization.py:22-112cognee/infrastructure/engine/models/DataPoint.py:105-131