agentic_huge_data_base / wiki
页面 Graphiti · 5.1 Episode 处理工作流·DeepWiki 中文全文译文

5.1 · Episode 处理工作流(Episode Processing Workflow)

时序知识图谱与动态事实记忆 · 本章是 Graphiti DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Graphiti 章节5.1 状态全文译文 模块图谱与关系、测试、发布与运维、工作流与编排、记忆与上下文
源码线索
  • graphiti_core/edges.py
  • graphiti_core/graphiti.py
  • graphiti_core/nodes.py
  • graphiti_core/prompts/extract_nodes_and_edges.py
  • graphiti_core/prompts/lib.py
  • graphiti_core/utils/maintenance/combined_extraction.py
  • graphiti_core/utils/maintenance/community_operations.py
  • graphiti_core/utils/maintenance/graph_data_operations.py
  • graphiti_core/utils/text_utils.py
  • tests/utils/test_concatenate_episodes.py
模块标签
  • 图谱与关系
  • 测试、发布与运维
  • 工作流与编排
  • 记忆与上下文
  • 界面与交互

中文译文

Episode 处理工作流(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/getzep/graphiti/5.1-episode-processing-workflow
翻译时间:2026-05-27T08:44:52.148Z
翻译模型:deepseek-chat
原文字符数:11080
项目:Graphiti (graphiti)

---

片段处理工作流

相关源文件

以下文件为本 Wiki 页面的生成提供了上下文:

  • graphiti_core/edges.py
  • graphiti_core/graphiti.py
  • graphiti_core/nodes.py
  • graphiti_core/prompts/extract_nodes_and_edges.py
  • graphiti_core/prompts/lib.py
  • graphiti_core/utils/maintenance/combined_extraction.py
  • graphiti_core/utils/maintenance/community_operations.py
  • graphiti_core/utils/maintenance/graph_data_operations.py
  • graphiti_core/utils/text_utils.py
  • tests/utils/test_concatenate_episodes.py

片段处理工作流是将新信息添加到知识图谱的主要入库管线。它会把非结构化的片段内容(消息、文本文档或 JSON)转换为由实体节点、关系边和时间元数据组成的结构化图谱表示。

---

目的与范围

该工作流由 Graphiti.add_episode() 方法编排 graphiti_core/graphiti.py:510-610,涉及以下步骤:

  • 检索相关历史上下文。
  • 提取并解析实体节点。
  • 提取并解析关系边。
  • 使用时间元数据持久化所有数据。
  • 可选地更新社区结构。

---

工作流总览

下图展示了从 add_episode 方法调用开始,经过所有处理阶段,最终完成持久化的完整端到端流程。

图表:端到端片段处理流程

sequenceDiagram
    participant Client
    participant Graphiti as "Graphiti.add_episode()"
    participant Context as "retrieve_episodes()"
    participant Combined as "combined_extraction.py"
    participant NodeOps as "node_operations.py"
    participant EdgeOps as "edge_operations.py"
    participant Bulk as "add_nodes_and_edges_bulk()"
    participant Driver as "GraphDriver"

    Client->>Graphiti: "add_episode(name, episode_body, ...)"

    Note over Graphiti: "校验与设置"
    Graphiti->>Graphiti: "validate_entity_types()"
    Graphiti->>Graphiti: "validate_group_id()"

    Note over Graphiti: "阶段 1:上下文检索"
    Graphiti->>Context: "retrieve_episodes(reference_time, last_n=3)"
    Context->>Driver: "查询最近片段"
    Driver-->>Graphiti: "previous_episodes[]"

    Graphiti->>Graphiti: "创建 EpisodicNode 实例"

    Note over Graphiti: "阶段 2 和 3:组合提取"
    Graphiti->>Combined: "extract_nodes_and_edges(episode, previous_episodes)"
    Combined-->>Graphiti: "nodes[], edges[], node_episode_index_map"

    Note over Graphiti: "阶段 4:解析"
    Graphiti->>NodeOps: "resolve_extracted_nodes(nodes)"
    NodeOps-->>Graphiti: "resolved_nodes[], uuid_map"

    Graphiti->>EdgeOps: "resolve_extracted_edges(edges)"
    EdgeOps-->>Graphiti: "resolved_edges[], invalidated_edges[]"

    Note over Graphiti: "阶段 5:属性与摘要提取"
    Graphiti->>NodeOps: "extract_attributes_from_nodes(nodes, edges)"
    NodeOps-->>Graphiti: "hydrated_nodes[]"

    Note over Graphiti: "阶段 6:持久化"
    Graphiti->>EdgeOps: "build_episodic_edges(nodes, episode_uuid)"
    Graphiti->>Bulk: "add_nodes_and_edges_bulk(episode, nodes, edges)"
    Bulk->>Driver: "执行图谱查询"

    opt "更新社区"
        Graphiti->>Graphiti: "update_community(node) for each node"
    end

    Graphiti-->>Client: "AddEpisodeResults"

来源:graphiti_core/graphiti.py:510-610graphiti_core/utils/maintenance/combined_extraction.py:41-50graphiti_core/utils/maintenance/node_operations.py:101-105graphiti_core/utils/maintenance/edge_operations.py:91-96

---

入口点与配置

add_episode 方法是片段处理的主要入口点,定义在 Graphiti 类中 graphiti_core/graphiti.py:510-534

参数分类
类别参数描述
片段元数据nameepisode_bodysource_descriptionreference_timesource核心片段信息和内容。graphiti_core/graphiti.py:511-516
图谱分区group_iduuid数据库命名空间和可选的特定片段 UUID。graphiti_core/graphiti.py:517-518
实体配置entity_typesexcluded_entity_types自定义实体类型模式和提取排除项。graphiti_core/graphiti.py:520-521
边配置edge_typesedge_type_map自定义关系模式和类型间的允许签名。graphiti_core/graphiti.py:523-524
上下文控制previous_episode_uuidscustom_extraction_instructions覆盖默认上下文检索或提供特定大语言模型(LLM)指导。graphiti_core/graphiti.py:522-525
叙事管理sagasaga_previous_episode_uuid通过 SagaNode 将片段链接到叙事序列中。graphiti_core/graphiti.py:526-527

来源:graphiti_core/graphiti.py:510-550

---

阶段 1:上下文检索

在处理之前,系统会检索最近的历史片段,为实体提取和消歧提供上下文。

图表:上下文检索流程

flowchart TD
    Start["调用 Graphiti.add_episode()"]
    CheckUUIDs{"提供了<br/>previous_episode_uuids?"}
    UseProvided["使用提供的 UUID"]
    RetrieveRecent["retrieve_episodes()<br/>(last_n=EPISODE_WINDOW_LEN)"]
    GetByUUIDs["EpisodicNode.get_by_uuids()"]
    CreateEpisode["创建 EpisodicNode<br/>实例"]
    ProceedToExtraction["进入<br/>组合提取"]

    Start --> CheckUUIDs
    CheckUUIDs -->|是| UseProvided
    CheckUUIDs -->|否| RetrieveRecent
    UseProvided --> GetByUUIDs
    RetrieveRecent --> CreateEpisode
    GetByUUIDs --> CreateEpisode
    CreateEpisode --> ProceedToExtraction

来源:graphiti_core/graphiti.py:560-580graphiti_core/utils/maintenance/graph_data_operations.py:67-74

上下文检索逻辑

系统使用 EPISODE_WINDOW_LEN(默认值为 3)来确定在未指定时检索多少个之前的片段 graphiti_core/utils/maintenance/graph_data_operations.py:29-29

# 来自 graphiti.py:560-569
previous_episodes = (
    await retrieve_episodes(
        self.driver,
        reference_time,
        last_n=EPISODE_WINDOW_LEN,
        group_ids=[group_id],
        source=source,
    )
    if previous_episode_uuids is None
    else await EpisodicNode.get_by_uuids(self.driver, previous_episode_uuids)
)

来源:graphiti_core/graphiti.py:560-569graphiti_core/utils/maintenance/graph_data_operations.py:67-74

---

阶段 2 和 3:组合提取

Graphiti 采用组合提取策略,在单次大语言模型(LLM)调用中同时提取节点和边 graphiti_core/utils/maintenance/combined_extraction.py:51-56。这通过让模型同时看到实体与事实之间的关系,确保了更高的提取质量。

组合提取逻辑

extract_nodes_and_edges 函数将当前片段与之前的片段拼接起来作为上下文 graphiti_core/utils/maintenance/combined_extraction.py:114-121。它使用了 extract_nodes_and_edges.extract_message 提示 graphiti_core/prompts/extract_nodes_and_edges.py:91-159

特性描述
实体提取识别说话者、命名实体和所有格实体(例如,"James 的笔记本")graphiti_core/prompts/extract_nodes_and_edges.py:101-113
事实提取提取关系事实,包含 source_entity_nametarget_entity_namefact 描述 graphiti_core/prompts/extract_nodes_and_edges.py:35-59
自引用事实当没有第二个实体适合时,捕获例程或状态(例如,"Sam 觉得自己缺乏动力")graphiti_core/prompts/extract_nodes_and_edges.py:124-129

来源:graphiti_core/utils/maintenance/combined_extraction.py:41-135graphiti_core/prompts/extract_nodes_and_edges.py:25-66

---

阶段 4:解析与去重

提取完成后,必须将实体和关系与现有数据进行解析,以防止图谱膨胀和矛盾。

节点解析

系统调用 resolve_extracted_nodes graphiti_core/utils/maintenance/node_operations.py:104-104,该函数执行以下操作:

  1. 精确匹配:对名称进行归一化,合并明显的重复项 graphiti_core/utils/maintenance/combined_extraction.py:174-176
  2. 模糊/大语言模型(LLM)解析:(详见 5.2 节)解析可能具有不同命名约定的相似实体。
边解析

resolve_extracted_edges 函数 graphiti_core/utils/maintenance/edge_operations.py:95-95 会识别新事实是否与现有事实矛盾。如果发现矛盾,较旧的边会被标记上 invalid_at 时间戳。

---

阶段 5:持久化与叙事管理

批量持久化

add_nodes_and_edges_bulk() 函数在单个数据库事务中持久化 EpisodicNodeEntityNodeEntityEdge graphiti_core/utils/bulk_utils.py:78-78

# 来自 graphiti.py:603-610
await add_nodes_and_edges_bulk(
    self.driver,
    [episode],
    episodic_edges,
    nodes,
    edges,
    self.embedder,
)

来源:graphiti_core/graphiti.py:603-610

叙事管理

如果提供了 saga 名称,Graphiti 会将片段链接到时间顺序链中:

  1. SagaNode:表示叙事线索 graphiti_core/nodes.py:57-57
  2. HasEpisodeEdge:将 SagaNode 链接到当前的 EpisodicNode graphiti_core/edges.py:36-37
  3. NextEpisodeEdge:将该叙事中的前一个片段链接到新片段 graphiti_core/edges.py:38-39

来源:graphiti_core/graphiti.py:632-660graphiti_core/edges.py:32-43

---

返回结果与遥测

工作流最终返回一个 AddEpisodeResults 对象 graphiti_core/graphiti.py:114-121,并捕获遥测事件 graphiti_core/graphiti.py:615-620

结果结构

graphiti_core/graphiti.py:114-121

class AddEpisodeResults(BaseModel):
    episode: EpisodicNode
    episodic_edges: list[EpisodicEdge]
    nodes: list[EntityNode]
    edges: list[EntityEdge]
    communities: list[CommunityNode]
    community_edges: list[CommunityEdge]
遥测

使用 capture_event() 捕获事件,以跟踪实体和边计数等处理元数据 graphiti_core/graphiti.py:74

来源:graphiti_core/graphiti.py:615-620graphiti_core/telemetry.py:74-74