数据 Provenance 与 Lineage(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/topoteretes/cognee/9.3-data-provenance-and-lineage
翻译时间:2026-05-27T08:45:21.288Z
翻译模型:deepseek-chat
原文字符数:13426
项目:Cognee (cognee)
---
数据溯源与血缘
相关源文件
以下文件被用作生成此维基页面的上下文:
cognee/infrastructure/engine/models/DataPoint.pycognee/infrastructure/utils/run_sync.pycognee/modules/chunking/models/DocumentChunk.pycognee/modules/data/methods/get_authorized_existing_datasets.pycognee/modules/data/methods/load_or_create_datasets.pycognee/modules/engine/models/Entity.pycognee/modules/engine/models/EntityType.pycognee/modules/pipelines/layers/pipeline_execution_mode.pycognee/modules/pipelines/methods/__init__.pycognee/modules/pipelines/operations/run_tasks_base.pycognee/modules/pipelines/operations/run_tasks_distributed.pycognee/modules/pipelines/operations/run_tasks_with_telemetry.pycognee/modules/users/permissions/methods/give_permission_on_dataset.pycognee/modules/visualization/cognee_network_visualization.pycognee/shared/CodeGraphEntities.pycognee/tasks/summarization/models.pycognee/tests/e2e/dataset_queue/test_queue_serialization_e2e.pycognee/tests/unit/modules/visualization/visualization_test.pydistributed/entrypoint.pydistributed/tasks/queued_add_data_points.pydistributed/tasks/queued_add_edges.pydistributed/tasks/queued_add_nodes.pydistributed/workers/data_point_saving_worker.pydistributed/workers/graph_saving_worker.py
目的与范围
本文档描述了 Cognee 的数据溯源与血缘追踪系统,该系统记录了数据点在处理管线中流转时的来源和转换历史。溯源元数据使用户能够追踪数据的来源、创建或修改数据的任务和管线,以及数据所属的用户或数据集。
关于基于 OpenTelemetry 的执行追踪(追踪*代码如何执行*而非*数据来自何处*),请参阅 9.2 OpenTelemetry 追踪。
关于核心数据模型结构,请参阅 8.1 核心数据模型。
关于管线执行,请参阅 3.2 管线任务与执行。
---
DataPoint 模型中的溯源字段
DataPoint 类是 Cognee 中所有数据实体的基础模型,包括 Entity、DocumentChunk、TextSummary 和 CodeFile。它包含在整个系统中追踪数据血缘的核心溯源字段。
字段定义
classDiagram
class DataPoint {
+UUID id
+int created_at
+int updated_at
+int version
+Optional[str] source_pipeline
+Optional[str] source_task
+Optional[str] source_node_set
+Optional[str] source_user
+Optional[str] source_content_hash
+float feedback_weight
}
DataPoint <|-- Entity
DataPoint <|-- DocumentChunk
DataPoint <|-- TextSummary
DataPoint <|-- CodeFile
溯源字段说明:
| 字段 | 类型 | 用途 | |
|---|---|---|---|
source_pipeline | `str \ | None` | 创建或最后修改此数据点的管线名称 cognee/infrastructure/engine/models/DataPoint.py:57 |
source_task | `str \ | None` | 创建或最后修改此数据点的任务函数名称 cognee/infrastructure/engine/models/DataPoint.py:58 |
source_node_set | `str \ | None` | 此数据点所属的逻辑节点集标识符 cognee/infrastructure/engine/models/DataPoint.py:59 |
source_user | `str \ | None` | 拥有此数据点的用户的电子邮件或 ID cognee/infrastructure/engine/models/DataPoint.py:60 |
source_content_hash | `str \ | None` | 用于派生此数据点的原始内容的哈希值 cognee/infrastructure/engine/models/DataPoint.py:61 |
version | int | 版本号,在更新时递增 cognee/infrastructure/engine/models/DataPoint.py:52 |
所有溯源字段都是可选的,默认值为 None。version 字段默认值为 1 cognee/infrastructure/engine/models/DataPoint.py:52。
来源: cognee/infrastructure/engine/models/DataPoint.py:44-64、cognee/modules/engine/models/Entity.py:7、cognee/modules/chunking/models/DocumentChunk.py:10、cognee/tasks/summarization/models.py:9、cognee/shared/CodeGraphEntities.py:35
---
管线执行期间的溯源标记
当 DataPoint 实例被管线任务创建或转换时,溯源元数据会自动标记到这些实例上。此标记操作发生在管线执行系统中的 _stamp_provenance 函数中。
标记流程
flowchart TB
start["任务执行开始 (handle_task)"]
extract["提取输入 node_set 和 content_hash"]
execute["任务可执行体运行"]
result["生成结果数据"]
stamp["调用 _stamp_provenance"]
check_dp{"结果是否为 DataPoint?"}
check_visited{"是否已访问?"}
set_fields["如果字段为 None 则设置:<br/>- source_pipeline<br/>- source_task<br/>- source_user<br/>- source_node_set<br/>- source_content_hash"]
recurse["递归处理<br/>模型字段"]
check_list{"结果是否为列表/元组?"}
recurse_list["递归处理<br/>每个元素"]
yield_result["将结果传递给<br/>下一个任务"]
start --> extract
extract --> execute
execute --> result
result --> stamp
stamp --> check_dp
check_dp -->|是| check_visited
check_visited -->|否| set_fields
set_fields --> recurse
recurse --> yield_result
check_visited -->|是| yield_result
check_dp -->|否| check_list
check_list -->|是| recurse_list
recurse_list --> yield_result
check_list -->|否| yield_result
标记实现
_stamp_provenance 函数 cognee/modules/pipelines/operations/run_tasks_base.py:33-85 执行以下操作:
- 维护一个已访问集合
cognee/modules/pipelines/operations/run_tasks_base.py:46-49,以防止循环引用导致的无限递归。该集合通过PipelineContext在任务调用之间持久化cognee/modules/pipelines/operations/run_tasks_base.py:38-41。 - 仅在字段当前为
None时进行标记cognee/modules/pipelines/operations/run_tasks_base.py:51-56——通常保留现有的溯源信息。 - 从父 DataPoint 传播
node_set和content_hashcognee/modules/pipelines/operations/run_tasks_base.py:58-71。 - 递归处理嵌套的 DataPoint
cognee/modules/pipelines/operations/run_tasks_base.py:72-84,通过遍历所有model_fields实现。
标记操作从 handle_task cognee/modules/pipelines/operations/run_tasks_base.py:123-180 在任务执行期间调用:
# 提取上下文和输入哈希值
pipe_name = ctx.pipeline_name if ctx else None
input_node_set = _extract_node_set(args)
input_content_hash = _extract_content_hash(args)
user_label = getattr(user, "email", None) or (str(user.id) if user else None)
async for result_data in running_task.execute(args, kwargs, next_task_batch_size):
_stamp_provenance(
result_data,
pipe_name,
task_name,
visited=provenance_visited,
node_set=input_node_set,
user_label=user_label,
content_hash=input_content_hash,
)
来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-180
---
溯源传播模式
溯源元数据通过以下几种方式在系统中传播:
跨任务传播
flowchart LR
task1["任务:classify_documents"]
dp1["文档 (DataPoint)<br/>source_task='classify_documents'<br/>source_node_set='research_nodes'"]
task2["任务:extract_chunks_from_documents"]
dp2["DocumentChunk (DataPoint)<br/>source_task='extract_chunks_from_documents'<br/>source_node_set='research_nodes'"]
task1 --> dp1
dp1 -->|"作为下一个任务的输入"| task2
task2 --> dp2
节点集传播: _extract_node_set 函数 cognee/modules/pipelines/operations/run_tasks_base.py:93-102 扫描输入参数以查找现有的 source_node_set 值。 内容哈希传播: _extract_content_hash 函数 cognee/modules/pipelines/operations/run_tasks_base.py:105-121 从输入的 Data 或 DataPoint 对象中提取内容哈希,以确保派生实体与源材料保持关联。
分布式传播
在分布式执行环境(例如使用 Modal)中,溯源上下文通过 PipelineContext 传递 cognee/modules/pipelines/operations/run_tasks_distributed.py:71-76。run_tasks_on_modal 包装器确保当任务在远程工作节点上执行时,它们会接收到正确的 user、dataset 和 pipeline_name,以维护准确的血缘关系 cognee/modules/pipelines/operations/run_tasks_distributed.py:47-81。
来源: cognee/modules/pipelines/operations/run_tasks_base.py:93-121、cognee/modules/pipelines/operations/run_tasks_distributed.py:47-81
---
使用溯源数据进行可视化
图可视化系统利用溯源字段,使用户能够根据节点的来源对节点进行颜色编码和过滤。
基于溯源的颜色编码
可视化系统使用 _generate_provenance_colors 辅助函数 cognee/modules/visualization/cognee_network_visualization.py:11-19 为各种溯源维度生成确定性颜色映射:
| 维度 | 生成函数 | 描述 |
|---|---|---|
| 任务 | task_color_map | 根据创建节点的任务对节点着色 cognee/modules/visualization/cognee_network_visualization.py:85 |
| 管线 | pipeline_color_map | 根据管线名称对节点着色 cognee/modules/visualization/cognee_network_visualization.py:86 |
| 节点集 | node_set_color_map | 根据节点的逻辑分组对节点着色 cognee/modules/visualization/cognee_network_visualization.py:87 |
| 用户 | user_color_map | 根据所有者对节点着色 cognee/modules/visualization/cognee_network_visualization.py:88 |
多用户图聚合
aggregate_multi_user_graphs 函数 cognee/modules/visualization/cognee_network_visualization.py:115-157 在保留或注入溯源信息的同时,合并来自多个用户的图数据:
flowchart TB
input["(用户, 数据集) 对列表"]
loop_start["对于每一对:"]
set_context["set_database_global_context_variables"]
get_graph["graph_engine.get_graph_data()"]
tag_nodes["为没有 source_user 的节点<br/>标记 user_label(电子邮件/ID)"]
merge["合并到聚合图中"]
output["返回 (nodes_data, edges_data)"]
input --> loop_start
loop_start --> set_context
set_context --> get_graph
get_graph --> tag_nodes
tag_nodes --> merge
merge --> output
来源: cognee/modules/visualization/cognee_network_visualization.py:11-157、cognee/tests/unit/modules/visualization/visualization_test.py:79-92
---
总结:溯源数据流
flowchart TB
ingest["cognee.add(data, dataset_name='X')"]
pipeline["run_tasks_base() 或 run_tasks_distributed()"]
stamp["_stamp_provenance() 递归标记:<br/>source_pipeline、source_task、source_user"]
db_graph["持久化到图数据库<br/>节点包含溯源字段"]
viz["cognee.visualize_graph() 检索数据"]
recolor["UI:根据 source_task 或 source_user 重新着色节点"]
ingest --> pipeline
pipeline --> stamp
stamp --> db_graph
db_graph --> viz
viz --> recolor
溯源系统确保了从初始入库 cognee/modules/pipelines/operations/run_tasks_distributed.py:84-110 到 cognify 中的转换任务 cognee/modules/pipelines/operations/run_tasks_base.py:123-180,再到最终可视化 cognee/modules/visualization/cognee_network_visualization.py:22-112 的完整可追溯性。
来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-180、cognee/modules/pipelines/operations/run_tasks_distributed.py:47-110、cognee/modules/visualization/cognee_network_visualization.py:11-157