agentic_huge_data_base / wiki
页面 Graphiti · 5.4 批量操作与优化·DeepWiki 中文全文译文

5.4 · 批量操作与优化(Bulk Operations and Optimization)

时序知识图谱与动态事实记忆 · 本章是 Graphiti DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Graphiti 章节5.4 状态全文译文 模块图谱与关系、界面与交互、测试、发布与运维、模型调用与提供方适配
源码线索
  • graphiti_core/graphiti.py
  • graphiti_core/utils/bulk_utils.py
  • tests/utils/maintenance/test_bulk_utils.py
模块标签
  • 图谱与关系
  • 界面与交互
  • 测试、发布与运维
  • 模型调用与提供方适配
  • 入库与解析

中文译文

批量操作与优化(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/getzep/graphiti/5.4-bulk-operations-and-optimization
翻译时间:2026-05-27T08:45:07.913Z
翻译模型:deepseek-chat
原文字符数:15762
项目:Graphiti (graphiti)

--- 好的,作为一名资深技术文档翻译专家,我将严格遵循您的要求,对这份 DeepWiki 技术文档进行全文翻译和润色。

---

批量操作与优化

相关源文件

以下文件被用作生成此 Wiki 页面的上下文:

  • graphiti_core/graphiti.py
  • graphiti_core/utils/bulk_utils.py
  • tests/utils/maintenance/test_bulk_utils.py

本文档记录了批量剧集入库系统:包括 RawEpisode 输入类型、add_episode_bulk 方法、graphiti_core/utils/bulk_utils.py 的内部实现、批处理与顺序单剧集处理的区别,以及相关的性能权衡。

关于单剧集处理的文档,请参见剧集处理工作流。关于两个流程共用的节点和边提取内部机制,请参见节点提取与解析边提取与解析

---

概述

Graphiti 的主要入库路径是逐条处理剧集:每次调用 add_episode 都是完全顺序执行的,在持久化之前,它会根据实时图谱解析实体和边。对于大规模入库(例如加载转录文本、文档语料库或历史数据集),这种方式会很慢,因为每个剧集都需要多次往返调用大语言模型(LLM)和图数据库。

add_episode_bulk 通过在一个剧集窗口内批量执行提取和解析工作、在内存中进行跨批次去重,并将所有结果以每个块(chunk)一个事务的方式写入图谱,来解决这个问题。

关键约束:批处理以牺牲跨剧集时间失效的完整性来换取吞吐量。边的失效和去重在批次内按剧集进行,但在解析过程中,同一批次内的剧集无法看到彼此新添加的边。

来源:graphiti_core/graphiti.py:123-130graphiti_core/utils/bulk_utils.py:1-66

---

RawEpisode 输入类型

RawEpisode 是一个轻量级的 Pydantic 模型,定义在 bulk_utils.py 中,它表示一个尚未转换为图节点的剧集。它是 add_episode_bulk 的输入元素。

字段类型描述
namestr人类可读的剧集名称
uuid`str \None`可选的稳定 UUID;如果省略则自动生成
contentstr原始剧集文本或结构化内容
source_descriptionstr描述内容来源
sourceEpisodeTypemessagetextjson
reference_timedatetime所描述事件的发生时间(valid_at

来源:graphiti_core/utils/bulk_utils.py:101-107

---

add_episode_bulk 接口

add_episode_bulkGraphiti 类的一个方法。其返回类型是 AddBulkEpisodeResults,它聚合了批次中所有剧集的结果。

AddBulkEpisodeResults 字段:

字段类型
episodeslist[EpisodicNode]
episodic_edgeslist[EpisodicEdge]
nodeslist[EntityNode]
edgeslist[EntityEdge]
communitieslist[CommunityNode]
community_edgeslist[CommunityEdge]

add_episode_bulk 接受的关键参数:

参数类型说明
episodeslist[RawEpisode]输入剧集
group_id`str \None`图谱分区标识符
entity_types`dict[str, type[BaseModel]] \None`自定义实体类型
excluded_entity_types`list[str] \None`要跳过的实体类型
edge_types`dict[str, type[BaseModel]] \None`自定义边类型
edge_type_map`dict[tuple[str, str], list[str]] \None`每对节点允许的边类型
custom_extraction_instructions`str \None`给大语言模型(LLM)提取的额外指令
saga`str \SagaNode \None`可选的 Saga 关联

完整的剧集列表会按 CHUNK_SIZE = 10 的块大小进行处理,该常量定义在 bulk_utils.py 中。

来源:graphiti_core/graphiti.py:123-130graphiti_core/utils/bulk_utils.py:66

---

批量处理管线

图表:“add_episode_bulk” 高级管线

flowchart TD
    A["list[RawEpisode]"] --> B["转换为 list[EpisodicNode]"]
    B --> C["按 CHUNK_SIZE=10 分块"]
    C --> D["retrieve_previous_episodes_bulk\n(每个剧集并行)"]
    D --> E["_extract_and_dedupe_nodes_bulk"]
    E --> E1["extract_nodes_and_edges_bulk\n(每个剧集并行)"]
    E1 --> E2["dedupe_nodes_bulk\n(两轮)"]
    E2 --> F["_resolve_nodes_and_edges_bulk"]
    F --> F1["resolve_extracted_nodes\n(每个剧集并行)"]
    F1 --> F2["extract_attributes_from_nodes\n(每个剧集并行)"]
    F2 --> F3["resolve_extracted_edges\n(每个剧集并行)"]
    F3 --> G["add_nodes_and_edges_bulk\n(单个事务)"]
    G --> H["AddBulkEpisodeResults"]

来源:graphiti_core/graphiti.py:591-732graphiti_core/utils/bulk_utils.py:110-293

---

bulk_utils.py 内部机制

retrieve_previous_episodes_bulk

graphiti_core/utils/bulk_utils.py:110-125

使用 semaphore_gather 并发地为批次中的每个剧集调用 retrieve_episodes。返回一个 (EpisodicNode, list[EpisodicNode]) 元组列表,每个元组将一个剧集与其前序剧集配对,以提供上下文。上下文窗口由 EPISODE_WINDOW_LEN = 3 定义。

retrieve_previous_episodes_bulk(driver, episodes)
  -> list[tuple[EpisodicNode, list[EpisodicNode]]]

来源:graphiti_core/utils/bulk_utils.py:110-125graphiti_core/utils/maintenance/graph_data_operations.py:29

extract_nodes_and_edges_bulk

graphiti_core/utils/bulk_utils.py:254-293

运行两轮 semaphore_gather

  1. 对所有 (episode, previous_episodes) 元组并行调用 extract_nodes
  2. 使用第一步中提取的对应节点作为节点上下文,并行调用 extract_edges

返回 (list[list[EntityNode]], list[list[EntityEdge]]) —— 每个剧集对应一个内部列表。

来源:graphiti_core/utils/bulk_utils.py:254-293

dedupe_nodes_bulk

graphiti_core/utils/bulk_utils.py:296-408

这是批量管线中最复杂的函数。它执行两轮去重策略:

第一轮 —— 图谱协调(并行): 为每个剧集并发调用 resolve_extracted_nodes。这与单剧集流程相同,使用嵌入向量相似度和 LLM 解析将提取的节点与现有图谱进行匹配。

第二轮 —— 跨批次内存去重: 遍历所有剧集中所有已解析的节点,构建一个 canonical_nodes 池。对于每个新节点:

  • 使用 _normalize_string_exact 检查精确字符串匹配。
  • 如果没有精确匹配,则通过 _build_candidate_indexes 使用基于 MinHash 的 Shingle 相似度,对所有当前规范节点运行 _resolve_with_similarity
  • 记录重复对。

最后,使用 _build_directed_uuid_map 将所有每个剧集的 UUID 映射和跨批次重复对合并到一个 compressed_map 中。

图表:“dedupe_nodes_bulk” 两轮策略

flowchart TD
    subgraph "第一轮 - 图谱协调"
        P1A["resolve_extracted_nodes\nepisode_1"]
        P1B["resolve_extracted_nodes\nepisode_2"]
        P1C["resolve_extracted_nodes\nepisode_N"]
        P1A & P1B & P1C --> P1D["per_episode_uuid_maps\n+ 来自图谱的 duplicate_pairs"]
    end
    subgraph "第二轮 - 内存跨批次去重"
        P2A["累积 canonical_nodes 池"]
        P2B["_normalize_string_exact\n精确匹配检查"]
        P2C["_build_candidate_indexes\n+ _resolve_with_similarity\n(MinHash)"]
        P2A --> P2B --> P2C --> P2D["额外的 duplicate_pairs"]
    end
    P1D --> MERGE["union_pairs = per_episode_maps + 所有 duplicate_pairs"]
    P2D --> MERGE
    MERGE --> COMPRESS["_build_directed_uuid_map\n(带路径压缩的有向并查集)"]
    COMPRESS --> OUT["nodes_by_episode: dict[str, list[EntityNode]]\ncompressed_map: dict[str, str]"]

来源:graphiti_core/utils/bulk_utils.py:296-408graphiti_core/utils/maintenance/dedup_helpers.py:47-50

dedupe_edges_bulk

graphiti_core/utils/bulk_utils.py:411-503

使用内存相似度而非图谱查找来去重跨所有剧集提取的边。算法如下:

  1. 为所有边生成嵌入向量:使用 create_entity_edge_embeddings(按剧集组并行)。
  2. 查找候选边:对于每条边,从所有剧集中收集具有相同 source_node_uuidtarget_node_uuid 的边,然后通过单词重叠或余弦相似度(≥ 0.6)进行过滤。
  3. 解析:为每个 (episode, edge, candidates) 三元组并行调用 resolve_extracted_edge
  4. 压缩:收集重复对,并使用 UnionFind 通过 compress_uuid_map 进行处理,该函数为每个 UUID 组分配字典序最小的规范 UUID。

返回 edges_by_episode: dict[str, list[EntityEdge]]

来源:graphiti_core/utils/bulk_utils.py:411-503

add_nodes_and_edges_bulk

graphiti_core/utils/bulk_utils.py:128-251

通过 driver.session().execute_write(add_nodes_and_edges_bulk_tx, ...) 在单个写入事务中将所有节点和边写入图谱。

事务函数 add_nodes_and_edges_bulk_tx 序列化所有数据,并运行特定于提供商的批量查询:

提供商策略
Neo4j / FalkorDB通过 get_entity_node_save_bulk_queryget_entity_edge_save_bulk_query 使用基于 UNWIND 的批量 MERGE 查询
Kuzu单行插入(Kuzu 的 UNWIND 不支持 STRUCT[]
实现了 graph_operations_interface 的驱动程序委托给接口方法

嵌入向量的生成在事务内部处理:没有 name_embedding 的节点会即时生成;没有 fact_embedding 的边也同样处理。

来源:graphiti_core/utils/bulk_utils.py:128-251graphiti_core/models/nodes/node_db_queries.py:40-41graphiti_core/models/edges/edge_db_queries.py:35-36

辅助工具

resolve_edge_pointers graphiti_core/utils/bulk_utils.py:549-556

使用 UUID 映射(由节点去重产生)更新边上的 source_node_uuidtarget_node_uuid。这确保了在去重后,边能正确指向规范节点。

_build_directed_uuid_map graphiti_core/utils/bulk_utils.py:69-98

带迭代路径压缩的有向并查集。折叠别名 → 规范链,使得 alias_a → alias_b → canonical 能正确地将 alias_a 解析为 canonical

compress_uuid_map graphiti_core/utils/bulk_utils.py:528-543

用于边去重。使用 UnionFind,并选择每个连通分量中字典序最小的 UUID 作为规范 UUID。

来源:graphiti_core/utils/bulk_utils.py:549-556graphiti_core/utils/bulk_utils.py:69-98graphiti_core/utils/bulk_utils.py:528-543

---

_extract_and_dedupe_nodes_bulk_resolve_nodes_and_edges_bulk

这两个 Graphiti 的私有方法编排了批量管线,并从 add_episode_bulk 中调用。

_extract_and_dedupe_nodes_bulk graphiti_core/graphiti.py:591-621

封装了 extract_nodes_and_edges_bulkdedupe_nodes_bulk。返回:

  • nodes_by_episode: dict[str, list[EntityNode]]
  • uuid_map: dict[str, str]
  • extracted_edges_bulk: list[list[EntityEdge]]

_resolve_nodes_and_edges_bulk graphiti_core/graphiti.py:623-732

  1. 为所有已解析节点构建 nodes_by_uuid 查找表。
  2. 使用压缩后的 UUID 映射对每个剧集的节点进行去重。
  3. 为所有剧集并行调用 resolve_extracted_nodes
  4. 为所有剧集并行调用 extract_attributes_from_nodes(填充/摘要)。
  5. 对每个剧集的边应用 resolve_edge_pointers
  6. 为所有剧集并行调用 resolve_extracted_edges

返回 (final_hydrated_nodes, resolved_edges, invalidated_edges, uuid_map)

来源:graphiti_core/graphiti.py:591-621graphiti_core/graphiti.py:623-732

---

单剧集处理与批处理对比

图表:代码实体映射 —— 单剧集 vs. 批量

flowchart LR
    subgraph "单剧集流程 (add_episode)"
        S1["extract_nodes"]
        S2["resolve_extracted_nodes"]
        S3["extract_edges"]
        S4["resolve_extracted_edges"]
        S5["extract_attributes_from_nodes"]
        S6["add_nodes_and_edges_bulk\n(1 个剧集)"]
        S1 --> S2 --> S3 --> S4 --> S5 --> S6
    end
    subgraph "批量流程 (add_episode_bulk)"
        B1["extract_nodes_and_edges_bulk\n(每个剧集并行)"]
        B2["dedupe_nodes_bulk\n(2 轮跨批次)"]
        B3["resolve_extracted_nodes\n(每个剧集并行)"]
        B4["extract_attributes_from_nodes\n(每个剧集并行)"]
        B5["resolve_extracted_edges\n(每个剧集并行)"]
        B6["add_nodes_and_edges_bulk\n(所有剧集, 1 个事务)"]
        B1 --> B2 --> B3 --> B4 --> B5 --> B6
    end
方面add_episodeadd_episode_bulk
剧集上下文来自图谱的前 N 个剧集每个剧集的前 N 个剧集(并行查询)
节点去重仅针对图谱针对图谱 + 跨批次内存去重(2 轮)
边去重通过嵌入向量 + BM25 搜索针对图谱内存中单词重叠 + 余弦相似度(≥ 0.6)
属性提取边解析之后节点解析之后,边解析之前
图谱写入每个剧集一个事务每个块一个事务(10 个剧集)
时间失效完整,具有跨剧集感知能力每个剧集,批次内无跨可见性
块大小不适用CHUNK_SIZE = 10

来源:graphiti_core/graphiti.py:788-1100graphiti_core/graphiti.py:591-732graphiti_core/utils/bulk_utils.py:66

---

性能考量

通过 semaphore_gather 实现并行

所有按剧集执行的操作(提取、节点解析、属性提取、边解析)都使用 semaphore_gather 并带有可配置的并发限制。这允许同时进行更多的大语言模型(LLM)和嵌入向量 API 调用,但代价是更高的速率限制压力。

来源:graphiti_core/utils/bulk_utils.py:113-120graphiti_core/utils/bulk_utils.py:260-272

块大小

剧集按 10 个一组(CHUNK_SIZE)进行处理。每个块产生一个写入事务。更大的块可以提高内存去重质量(找到更多跨批次匹配),但会增加内存使用量和批次内冲突的风险。

来源:graphiti_core/utils/bulk_utils.py:66

内存边去重准确性

dedupe_edges_bulk 使用 0.6 的余弦相似度阈值和单词重叠作为候选过滤器,这比单剧集处理中 resolve_extracted_edges 使用的完整 BM25 + 嵌入向量搜索要粗略。这意味着一些跨越多个批处理块的近似重复边可能不会被捕获,直到它们稍后针对图谱进行解析。

来源:graphiti_core/utils/bulk_utils.py:411-503

事务效率

add_nodes_and_edges_bulk_tx 对 Neo4j 和 FalkorDB 使用基于 UNWIND 的批量 MERGE,这比为每个节点/边发出单独的 Cypher 写入要快得多。对于 Kuzu,由于驱动程序在 UNWIND 中不支持 STRUCT[] 的限制,因此使用单行插入。

来源:graphiti_core/utils/bulk_utils.py:128-251graphiti_core/models/nodes/node_db_queries.py:40-41

嵌入向量生成

add_nodes_and_edges_bulk_tx 中,如果嵌入向量尚不存在,则会在事务函数内部惰性生成。对于大型批次,如果嵌入器吞吐量较低,这可能会成为瓶颈。

来源:graphiti_core/utils/bulk_utils.py:168-169graphiti_core/utils/bulk_utils.py:193-194