自定义管线 Tutorial(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/topoteretes/cognee/12.5-custom-pipeline-tutorial
翻译时间:2026-05-27T08:45:17.818Z
翻译模型:deepseek-chat
原文字符数:10968
项目:Cognee (cognee)
---
自定义管线教程
相关源文件
以下文件为本 Wiki 页面生成时使用的上下文:
cognee/modules/pipelines/models/PipelineContext.pycognee/modules/pipelines/operations/run_pipeline.pycognee/modules/pipelines/tasks/task.pycognee/pipelines/__init__.pycognee/pipelines/types.pycognee/tests/unit/infrastructure/engine/test_identity_fields.pycognee/tests/unit/pipelines/test_simplified_pipelines.pyexamples/custom_pipelines/agentic_reasoning_procurement_example.pyexamples/custom_pipelines/memify_coding_agent_rule_extraction_example.pyexamples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.pyexamples/demos/dynamic_multiple_weighted_edges_example.pyexamples/demos/web_url_content_ingestion_example.pyexamples/guides/consolidate_entity_descriptions_example.pyexamples/guides/custom_data_models.pyexamples/guides/custom_graph_model.pyexamples/guides/custom_prompts.pyexamples/guides/custom_tasks_and_pipelines.pyexamples/guides/graph_visualization.pyexamples/guides/importance_weight.pyexamples/guides/improve_quickstart.pyexamples/guides/ontology_quickstart.py
本教程演示如何在 Cognee 中通过组合 Task 对象并执行它们来创建自定义数据处理管线。自定义管线支持超越默认 add 和 cognify 操作的领域特定处理工作流,例如提取特定组织层级或执行自定义实体解析。
关于内置管线任务和执行框架的信息,请参见管线任务与执行。
理解任务抽象
Cognee 管线由包装 Python 函数的 Task 对象组成。每个任务按顺序处理数据,前一个任务的输出会传入下一个任务。
任务函数签名模式
flowchart LR
INPUT["数据输入<br/>(来自前一个任务<br/>或管线数据)"]
FUNCTION["任务函数<br/>async def task_fn(data, **kwargs)"]
CONTEXT["PipelineContext<br/>user, dataset,<br/>pipeline_name, extras"]
OUTPUT["处理后的数据<br/>(输入到下一个任务)"]
INPUT --> FUNCTION
CONTEXT --> FUNCTION
FUNCTION --> OUTPUT
任务函数要求:
- 必须是
async函数、生成器或协程。cognee/modules/pipelines/tasks/task.py:173-178 - 第一个参数接收来自前一个任务或初始管线数据的数据。
- 可以接受
ctx参数(一个PipelineContext实例),其中包含user、dataset_id和pipeline_name。cognee/modules/pipelines/models/PipelineContext.py:8-14 handle_task中的编排逻辑会在传递ctx之前检查函数签名是否接受该参数。cognee/modules/pipelines/operations/run_tasks_base.py:145-149
来源: cognee/modules/pipelines/tasks/task.py:173-206、cognee/modules/pipelines/operations/run_tasks_base.py:123-166、cognee/modules/pipelines/models/PipelineContext.py:8-14
任务包装器配置
Task 类提供了批处理和参数传递的配置选项。现代 Cognee 用法还支持 @task 装饰器和 TaskSpec 用于延迟执行。cognee/modules/pipelines/tasks/task.py:34-62
| 参数 | 类型 | 用途 | 示例 |
|---|---|---|---|
executable | 可调用对象 | 要执行的异步函数 | extract_people |
*args | 任意类型 | 传递给函数的位置参数 | max_chunk_size=512 |
task_config | 字典 | 配置项,如 batch_size | {"batch_size": 100} |
**kwargs | 任意类型 | 额外的关键字参数 | graph_model=CustomModel |
graph TB
TASK["Task 包装器 (cognee.modules.pipelines.tasks.task.Task)"]
FUNC["包装的函数<br/>例如 ingest_files"]
ARGS["函数参数<br/>例如 include_subdirectories=True"]
BATCH["task_config<br/>{batch_size: 100}"]
KWARGS["额外参数<br/>例如 custom_prompt='...'"]
TASK --> FUNC
TASK --> ARGS
TASK --> BATCH
TASK --> KWARGS
FUNC -.接收.-> ARGS
FUNC -.接收.-> KWARGS
BATCH -.控制.-> FUNC
来源: cognee/modules/pipelines/tasks/task.py:187-206、cognee/modules/pipelines/operations/run_tasks_base.py:166-180
创建自定义任务函数
自定义任务函数根据其在管线中的角色遵循特定模式。
数据转换任务模式
转换数据的任务函数通常接收一个数据项列表或单个数据项,并返回转换后的数据。
来自自定义组织层级管线的示例: examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:60-94
def ingest_files(data: List[Any]) -> List[Company]:
all_companies: List[Company] = []
# 将原始数据转换为 Company DataPoint 的逻辑
for data_item in data:
# ... 处理 ...
all_companies.append(Company(name=company["name"], ...))
return all_companies
存储任务模式
写入数据库的任务通常接收处理后的 DataPoint 对象并将其存储到配置的数据库中。add_data_points 任务通常用于自定义管线的末尾以持久化结果。
cognee/tasks/storage/__init__.py:1-5
from cognee.tasks.storage import add_data_points
# 这个内置任务接收 DataPoint 并将其保存到向量和图存储中
Task(add_data_points)
来源: examples/guides/custom_tasks_and_pipelines.py:76-79、cognee/tasks/storage/__init__.py:1-5
组装和运行自定义管线
管线执行与来源追踪
当管线运行时,Cognee 会自动为 DataPoint 对象打上来源信息,包括 source_pipeline、source_task 和 source_user。这由任务执行期间的 _stamp_provenance 函数处理。cognee/modules/pipelines/operations/run_tasks_base.py:33-92
sequenceDiagram
participant Client as "自定义代码"
participant RTB as "run_tasks_base"
participant HT as "handle_task"
participant Tasks as "任务函数"
participant SP as "_stamp_provenance"
Client->>RTB: run_tasks_base(tasks, data, user, ctx)
RTB->>HT: handle_task(running_task, args, leftover_tasks, ...)
HT->>Tasks: await running_task.execute(args, kwargs)
Tasks-->>HT: result_data
HT->>SP: _stamp_provenance(result_data, pipe_name, task_name, ...)
SP->>SP: 递归设置 source_pipeline、source_task、source_user
HT->>RTB: yield result
来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-92、cognee/modules/pipelines/operations/run_tasks_base.py:123-183
run_custom_pipeline 接口
用户可以使用高级 run_custom_pipeline API 运行自定义任务列表。
examples/guides/custom_tasks_and_pipelines.py:81-83
await cognee.run_custom_pipeline(
tasks=[
Task(extract_people),
Task(add_data_points),
],
data=build_lightweight_data_object(text_data),
dataset="people_demo"
)
代码到实体的映射
此图将高级管线概念桥接到实现中使用的具体代码实体。
graph TD
subgraph "自然语言/概念空间"
P_CONCEPT["管线定义"]
T_CONCEPT["处理任务"]
D_CONCEPT["领域数据"]
V_CONCEPT["图可视化"]
end
subgraph "代码实体空间 (cognee)"
RTB_FUNC["run_tasks_base() 在 cognee/modules/pipelines/operations/run_tasks_base.py"]
T_CLASS["Task 类在 cognee/modules/pipelines/tasks/task.py"]
DP_CLASS["DataPoint 类在 cognee/infrastructure/engine/models/DataPoint.py"]
VIS_FUNC["visualize_graph() 在 cognee/api/v1/visualize/visualize.py"]
end
P_CONCEPT --> RTB_FUNC
T_CONCEPT --> T_CLASS
D_CONCEPT --> DP_CLASS
V_CONCEPT --> VIS_FUNC
RTB_FUNC --> T_CLASS
来源: cognee/modules/pipelines/operations/run_tasks_base.py:182-194、cognee/modules/pipelines/tasks/task.py:172-186、cognee/infrastructure/engine/models/DataPoint.py:27-55、cognee/api/v1/visualize/visualize.py:12-20
实现示例:组织层级
要实现领域特定的管线,请定义您的 DataPoint 模型和转换任务。在 metadata 中使用 identity_fields 可以实现确定性 ID 生成和自动去重。cognee/tests/unit/infrastructure/engine/test_identity_fields.py:11-20
examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:18-28
class Person(DataPoint):
name: str
metadata: dict = {"index_fields": ["name"], "identity_fields": ["name"]}
class Department(DataPoint):
name: str
employees: list[Person]
metadata: dict = {"index_fields": ["name"], "identity_fields": ["name"]}
自定义管线的可视化
自定义管线通常会产生独特的图结构。您可以使用 visualize_graph 来可视化这些结构。可视化系统使用来源数据(source_task、source_pipeline)为节点着色。
cognee/modules/visualization/cognee_network_visualization.py:85-88
task_color_map = _generate_provenance_colors([n.get("source_task") for n in nodes_list])
pipeline_color_map = _generate_provenance_colors([n.get("source_pipeline") for n in nodes_list])
来源: examples/custom_pipelines/organizational_hierarchy/organizational_hierarchy_pipeline_example.py:18-94、cognee/modules/visualization/cognee_network_visualization.py:22-112、cognee/infrastructure/engine/models/DataPoint.py:105-131