agentic_huge_data_base / wiki
页面 Cognee · 9.3 数据 Provenance 与 Lineage·DeepWiki 中文全文译文

9.3 · 数据 Provenance 与 Lineage(Data Provenance and Lineage)

记忆管道与知识图谱构建 · 本章是 Cognee DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Cognee 章节9.3 状态全文译文 模块文档对象与元数据、图谱与关系、工作流与编排、测试、发布与运维
源码线索
  • cognee/infrastructure/engine/models/DataPoint.py
  • cognee/infrastructure/utils/run_sync.py
  • cognee/modules/chunking/models/DocumentChunk.py
  • cognee/modules/data/methods/get_authorized_existing_datasets.py
  • cognee/modules/data/methods/load_or_create_datasets.py
  • cognee/modules/engine/models/Entity.py
  • cognee/modules/engine/models/EntityType.py
  • cognee/modules/pipelines/layers/pipeline_execution_mode.py
  • cognee/modules/pipelines/methods/__init__.py
  • cognee/modules/pipelines/operations/run_tasks_base.py
模块标签
  • 文档对象与元数据
  • 图谱与关系
  • 工作流与编排
  • 测试、发布与运维
  • 认证、权限与安全

中文译文

数据 Provenance 与 Lineage(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/topoteretes/cognee/9.3-data-provenance-and-lineage
翻译时间:2026-05-27T08:45:21.288Z
翻译模型:deepseek-chat
原文字符数:13426
项目:Cognee (cognee)

---

数据溯源与血缘

相关源文件

以下文件被用作生成此维基页面的上下文:

  • cognee/infrastructure/engine/models/DataPoint.py
  • cognee/infrastructure/utils/run_sync.py
  • cognee/modules/chunking/models/DocumentChunk.py
  • cognee/modules/data/methods/get_authorized_existing_datasets.py
  • cognee/modules/data/methods/load_or_create_datasets.py
  • cognee/modules/engine/models/Entity.py
  • cognee/modules/engine/models/EntityType.py
  • cognee/modules/pipelines/layers/pipeline_execution_mode.py
  • cognee/modules/pipelines/methods/__init__.py
  • cognee/modules/pipelines/operations/run_tasks_base.py
  • cognee/modules/pipelines/operations/run_tasks_distributed.py
  • cognee/modules/pipelines/operations/run_tasks_with_telemetry.py
  • cognee/modules/users/permissions/methods/give_permission_on_dataset.py
  • cognee/modules/visualization/cognee_network_visualization.py
  • cognee/shared/CodeGraphEntities.py
  • cognee/tasks/summarization/models.py
  • cognee/tests/e2e/dataset_queue/test_queue_serialization_e2e.py
  • cognee/tests/unit/modules/visualization/visualization_test.py
  • distributed/entrypoint.py
  • distributed/tasks/queued_add_data_points.py
  • distributed/tasks/queued_add_edges.py
  • distributed/tasks/queued_add_nodes.py
  • distributed/workers/data_point_saving_worker.py
  • distributed/workers/graph_saving_worker.py

目的与范围

本文档描述了 Cognee 的数据溯源与血缘追踪系统,该系统记录了数据点在处理管线中流转时的来源和转换历史。溯源元数据使用户能够追踪数据的来源、创建或修改数据的任务和管线,以及数据所属的用户或数据集。

关于基于 OpenTelemetry 的执行追踪(追踪*代码如何执行*而非*数据来自何处*),请参阅 9.2 OpenTelemetry 追踪

关于核心数据模型结构,请参阅 8.1 核心数据模型

关于管线执行,请参阅 3.2 管线任务与执行

---

DataPoint 模型中的溯源字段

DataPoint 类是 Cognee 中所有数据实体的基础模型,包括 EntityDocumentChunkTextSummaryCodeFile。它包含在整个系统中追踪数据血缘的核心溯源字段。

字段定义
classDiagram
    class DataPoint {
        +UUID id
        +int created_at
        +int updated_at
        +int version
        +Optional[str] source_pipeline
        +Optional[str] source_task
        +Optional[str] source_node_set
        +Optional[str] source_user
        +Optional[str] source_content_hash
        +float feedback_weight
    }
    DataPoint <|-- Entity
    DataPoint <|-- DocumentChunk
    DataPoint <|-- TextSummary
    DataPoint <|-- CodeFile

溯源字段说明:

字段类型用途
source_pipeline`str \None`创建或最后修改此数据点的管线名称 cognee/infrastructure/engine/models/DataPoint.py:57
source_task`str \None`创建或最后修改此数据点的任务函数名称 cognee/infrastructure/engine/models/DataPoint.py:58
source_node_set`str \None`此数据点所属的逻辑节点集标识符 cognee/infrastructure/engine/models/DataPoint.py:59
source_user`str \None`拥有此数据点的用户的电子邮件或 ID cognee/infrastructure/engine/models/DataPoint.py:60
source_content_hash`str \None`用于派生此数据点的原始内容的哈希值 cognee/infrastructure/engine/models/DataPoint.py:61
versionint版本号,在更新时递增 cognee/infrastructure/engine/models/DataPoint.py:52

所有溯源字段都是可选的,默认值为 Noneversion 字段默认值为 1 cognee/infrastructure/engine/models/DataPoint.py:52

来源: cognee/infrastructure/engine/models/DataPoint.py:44-64cognee/modules/engine/models/Entity.py:7cognee/modules/chunking/models/DocumentChunk.py:10cognee/tasks/summarization/models.py:9cognee/shared/CodeGraphEntities.py:35

---

管线执行期间的溯源标记

DataPoint 实例被管线任务创建或转换时,溯源元数据会自动标记到这些实例上。此标记操作发生在管线执行系统中的 _stamp_provenance 函数中。

标记流程
flowchart TB
    start["任务执行开始 (handle_task)"]
    extract["提取输入 node_set 和 content_hash"]
    execute["任务可执行体运行"]
    result["生成结果数据"]
    stamp["调用 _stamp_provenance"]
    check_dp{"结果是否为 DataPoint?"}
    check_visited{"是否已访问?"}
    set_fields["如果字段为 None 则设置:<br/>- source_pipeline<br/>- source_task<br/>- source_user<br/>- source_node_set<br/>- source_content_hash"]
    recurse["递归处理<br/>模型字段"]
    check_list{"结果是否为列表/元组?"}
    recurse_list["递归处理<br/>每个元素"]
    yield_result["将结果传递给<br/>下一个任务"]

    start --> extract
    extract --> execute
    execute --> result
    result --> stamp
    stamp --> check_dp
    check_dp -->|是| check_visited
    check_visited -->|否| set_fields
    set_fields --> recurse
    recurse --> yield_result
    check_visited -->|是| yield_result
    check_dp -->|否| check_list
    check_list -->|是| recurse_list
    recurse_list --> yield_result
    check_list -->|否| yield_result
标记实现

_stamp_provenance 函数 cognee/modules/pipelines/operations/run_tasks_base.py:33-85 执行以下操作:

  1. 维护一个已访问集合 cognee/modules/pipelines/operations/run_tasks_base.py:46-49,以防止循环引用导致的无限递归。该集合通过 PipelineContext 在任务调用之间持久化 cognee/modules/pipelines/operations/run_tasks_base.py:38-41
  2. 仅在字段当前为 None 时进行标记 cognee/modules/pipelines/operations/run_tasks_base.py:51-56——通常保留现有的溯源信息。
  3. 从父 DataPoint 传播 node_setcontent_hash cognee/modules/pipelines/operations/run_tasks_base.py:58-71
  4. 递归处理嵌套的 DataPoint cognee/modules/pipelines/operations/run_tasks_base.py:72-84,通过遍历所有 model_fields 实现。

标记操作从 handle_task cognee/modules/pipelines/operations/run_tasks_base.py:123-180 在任务执行期间调用:

# 提取上下文和输入哈希值
pipe_name = ctx.pipeline_name if ctx else None
input_node_set = _extract_node_set(args)
input_content_hash = _extract_content_hash(args)
user_label = getattr(user, "email", None) or (str(user.id) if user else None)

async for result_data in running_task.execute(args, kwargs, next_task_batch_size):
    _stamp_provenance(
        result_data,
        pipe_name,
        task_name,
        visited=provenance_visited,
        node_set=input_node_set,
        user_label=user_label,
        content_hash=input_content_hash,
    )

来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-180

---

溯源传播模式

溯源元数据通过以下几种方式在系统中传播:

跨任务传播
flowchart LR
    task1["任务:classify_documents"]
    dp1["文档 (DataPoint)<br/>source_task='classify_documents'<br/>source_node_set='research_nodes'"]
    task2["任务:extract_chunks_from_documents"]
    dp2["DocumentChunk (DataPoint)<br/>source_task='extract_chunks_from_documents'<br/>source_node_set='research_nodes'"]

    task1 --> dp1
    dp1 -->|"作为下一个任务的输入"| task2
    task2 --> dp2

节点集传播: _extract_node_set 函数 cognee/modules/pipelines/operations/run_tasks_base.py:93-102 扫描输入参数以查找现有的 source_node_set 值。 内容哈希传播: _extract_content_hash 函数 cognee/modules/pipelines/operations/run_tasks_base.py:105-121 从输入的 DataDataPoint 对象中提取内容哈希,以确保派生实体与源材料保持关联。

分布式传播

在分布式执行环境(例如使用 Modal)中,溯源上下文通过 PipelineContext 传递 cognee/modules/pipelines/operations/run_tasks_distributed.py:71-76run_tasks_on_modal 包装器确保当任务在远程工作节点上执行时,它们会接收到正确的 userdatasetpipeline_name,以维护准确的血缘关系 cognee/modules/pipelines/operations/run_tasks_distributed.py:47-81

来源: cognee/modules/pipelines/operations/run_tasks_base.py:93-121cognee/modules/pipelines/operations/run_tasks_distributed.py:47-81

---

使用溯源数据进行可视化

图可视化系统利用溯源字段,使用户能够根据节点的来源对节点进行颜色编码和过滤。

基于溯源的颜色编码

可视化系统使用 _generate_provenance_colors 辅助函数 cognee/modules/visualization/cognee_network_visualization.py:11-19 为各种溯源维度生成确定性颜色映射:

维度生成函数描述
任务task_color_map根据创建节点的任务对节点着色 cognee/modules/visualization/cognee_network_visualization.py:85
管线pipeline_color_map根据管线名称对节点着色 cognee/modules/visualization/cognee_network_visualization.py:86
节点集node_set_color_map根据节点的逻辑分组对节点着色 cognee/modules/visualization/cognee_network_visualization.py:87
用户user_color_map根据所有者对节点着色 cognee/modules/visualization/cognee_network_visualization.py:88
多用户图聚合

aggregate_multi_user_graphs 函数 cognee/modules/visualization/cognee_network_visualization.py:115-157 在保留或注入溯源信息的同时,合并来自多个用户的图数据:

flowchart TB
    input["(用户, 数据集) 对列表"]
    loop_start["对于每一对:"]
    set_context["set_database_global_context_variables"]
    get_graph["graph_engine.get_graph_data()"]
    tag_nodes["为没有 source_user 的节点<br/>标记 user_label(电子邮件/ID)"]
    merge["合并到聚合图中"]
    output["返回 (nodes_data, edges_data)"]

    input --> loop_start
    loop_start --> set_context
    set_context --> get_graph
    get_graph --> tag_nodes
    tag_nodes --> merge
    merge --> output

来源: cognee/modules/visualization/cognee_network_visualization.py:11-157cognee/tests/unit/modules/visualization/visualization_test.py:79-92

---

总结:溯源数据流

flowchart TB
    ingest["cognee.add(data, dataset_name='X')"]
    pipeline["run_tasks_base() 或 run_tasks_distributed()"]
    stamp["_stamp_provenance() 递归标记:<br/>source_pipeline、source_task、source_user"]
    db_graph["持久化到图数据库<br/>节点包含溯源字段"]
    viz["cognee.visualize_graph() 检索数据"]
    recolor["UI:根据 source_task 或 source_user 重新着色节点"]

    ingest --> pipeline
    pipeline --> stamp
    stamp --> db_graph
    db_graph --> viz
    viz --> recolor

溯源系统确保了从初始入库 cognee/modules/pipelines/operations/run_tasks_distributed.py:84-110cognify 中的转换任务 cognee/modules/pipelines/operations/run_tasks_base.py:123-180,再到最终可视化 cognee/modules/visualization/cognee_network_visualization.py:22-112 的完整可追溯性。

来源: cognee/modules/pipelines/operations/run_tasks_base.py:33-180cognee/modules/pipelines/operations/run_tasks_distributed.py:47-110cognee/modules/visualization/cognee_network_visualization.py:11-157