Graphiti 核心客户端(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/getzep/graphiti/4.1-graphiti-core-client
翻译时间:2026-05-27T08:45:02.344Z
翻译模型:deepseek-chat
原文字符数:16236
项目:Graphiti (graphiti)
---
Graphiti 核心客户端
相关源文件
以下文件被用作生成此 Wiki 页面的上下文:
graphiti_core/driver/falkordb/operations/graph_ops.pygraphiti_core/driver/neo4j/operations/graph_ops.pygraphiti_core/graphiti.pygraphiti_core/namespaces/edges.pygraphiti_core/namespaces/nodes.py
本文档记录了 Graphiti 类——所有 graphiti-core 操作的主要入口点——包括其构造函数参数、公开方法、结果类型以及它所暴露的 NodeNamespace/EdgeNamespace 访问器 API。
关于 Graphiti 在更广泛系统中的架构角色,请参见系统架构。关于可插拔提供者系统和 GraphitiClients 容器的文档,请参见多提供者插件架构。关于 search 所委托的搜索子系统,请参见搜索与检索系统。关于完整的事件入库管线,请参见数据处理管线。
---
概述
Graphiti(定义于 graphiti_core/graphiti.py:137-1051)是核心编排类。它持有所有提供者客户端的引用,暴露用于入库事件和搜索知识图谱的方法,并协调完整的提取-解析-持久化管线。
类关系图:
classDiagram
class Graphiti {
+driver: GraphDriver
+llm_client: LLMClient
+embedder: EmbedderClient
+cross_encoder: CrossEncoderClient
+tracer: Tracer
+clients: GraphitiClients
+nodes: NodeNamespace
+edges: EdgeNamespace
+store_raw_episode_content: bool
+max_coroutines: int | None
+token_tracker: TokenUsageTracker
+build_indices_and_constraints()
+close()
+add_episode()
+add_episode_bulk()
+retrieve_episodes()
+search()
+add_triplet()
}
class GraphitiClients {
+driver: GraphDriver
+llm_client: LLMClient
+embedder: EmbedderClient
+cross_encoder: CrossEncoderClient
+tracer: Tracer
}
class NodeNamespace {
+entity: EntityNodeNamespace
+episode: EpisodeNodeNamespace
+community: CommunityNodeNamespace
+saga: SagaNodeNamespace
}
class EdgeNamespace {
+entity: EntityEdgeNamespace
+episodic: EpisodicEdgeNamespace
+community: CommunityEdgeNamespace
+has_episode: HasEpisodeEdgeNamespace
+next_episode: NextEpisodeEdgeNamespace
}
Graphiti --> GraphitiClients : "clients"
Graphiti --> NodeNamespace : "nodes"
Graphiti --> EdgeNamespace : "edges"
来源:graphiti_core/graphiti.py:137-246,graphiti_core/namespaces/nodes.py:29-234,graphiti_core/namespaces/edges.py:34-230
---
构造函数
graphiti_core/graphiti.py:138-246
Graphiti(
uri, user, password,
llm_client, embedder, cross_encoder,
store_raw_episode_content, graph_driver,
max_coroutines, tracer, trace_span_prefix
)
参数
| 参数 | 类型 | 默认值 | 描述 | |
|---|---|---|---|---|
uri | `str \ | None` | None | Neo4j 数据库的 URI。当未提供 graph_driver 时必填。 |
user | `str \ | None` | None | 数据库用户名。 |
password | `str \ | None` | None | 数据库密码。 |
llm_client | `LLMClient \ | None` | None | 大语言模型(LLM)提供者客户端。默认为 OpenAIClient()。 |
embedder | `EmbedderClient \ | None` | None | 嵌入向量提供者客户端。默认为 OpenAIEmbedder()。 |
cross_encoder | `CrossEncoderClient \ | None` | None | 重排序器客户端。默认为 OpenAIRerankerClient()。 |
store_raw_episode_content | bool | True | 如果为 False,则在持久化前清除事件内容以节省存储空间。 | |
graph_driver | `GraphDriver \ | None` | None | 完全构造的驱动程序。会覆盖 uri/user/password。 |
max_coroutines | `int \ | None` | None | 并发限制;会覆盖 SEMAPHORE_LIMIT 环境变量。 |
tracer | `Tracer \ | None` | None | OpenTelemetry 追踪器。如果为 None,则追踪为空操作。 |
trace_span_prefix | str | 'graphiti' | 所有跨度名称的前缀。 |
默认提供者解析
当提供者参数为 None 时,构造函数会回退到该角色的默认提供者:
flowchart TD
A["是否提供了 graph_driver?"]
A -- "是" --> B["self.driver = graph_driver"]
A -- "否" --> C["需要 uri;self.driver = Neo4jDriver(uri, user, password)"]
D["是否提供了 llm_client?"]
D -- "否" --> E["self.llm_client = OpenAIClient()"]
F["是否提供了 embedder?"]
F -- "否" --> G["self.embedder = OpenAIEmbedder()"]
H["是否提供了 cross_encoder?"]
H -- "否" --> I["self.cross_encoder = OpenAIRerankerClient()"]
来源:graphiti_core/graphiti.py:203-238
---
关键属性
构造完成后,以下公开属性可用:
| 属性 | 类型 | 描述 | |
|---|---|---|---|
driver | GraphDriver | 活跃的图数据库驱动程序。 | |
llm_client | LLMClient | 用于提取和去重的大语言模型(LLM)客户端。 | |
embedder | EmbedderClient | 用于节点/边嵌入向量生成的嵌入向量客户端。 | |
cross_encoder | CrossEncoderClient | 搜索期间使用的重排序器。 | |
tracer | Tracer | 追踪包装器;如果未配置则为空操作。 | |
clients | GraphitiClients | 打包所有提供者客户端的便捷容器。 | |
nodes | NodeNamespace | 用于低级节点操作的命名空间。 | |
edges | EdgeNamespace | 用于低级边操作的命名空间。 | |
store_raw_episode_content | bool | 是否持久化原始事件文本。 | |
max_coroutines | `int \ | None` | semaphore_gather 的并发上限。 |
来源:graphiti_core/graphiti.py:203-246
---
GraphitiClients 容器
GraphitiClients(从 graphiti_core/graphiti_types.py 导入)是一个数据类,将所有提供者引用分组到一个对象中。子系统函数(例如 utils/maintenance/ 中的函数)接受 GraphitiClients 参数而不是单独的参数,从而避免了长参数列表。
flowchart LR
subgraph "Graphiti 初始化"
G["Graphiti"]
end
subgraph "GraphitiClients [graphiti_core/graphiti_types.py]"
D["driver: GraphDriver"]
L["llm_client: LLMClient"]
EM["embedder: EmbedderClient"]
CE["cross_encoder: CrossEncoderClient"]
TR["tracer: Tracer"]
end
G --> D
G --> L
G --> EM
G --> CE
G --> TR
self.clients 在构造时组装(graphiti_core/graphiti.py:232-238),并传递给内部管线函数,如 extract_nodes、resolve_extracted_nodes、extract_edges 和 search。
来源:graphiti_core/graphiti.py:232-238
---
NodeNamespace 和 EdgeNamespace
NodeNamespace 和 EdgeNamespace 从 graphiti_core/namespaces/ 导入,并分别实例化为 self.nodes 和 self.edges(graphiti_core/graphiti.py:241-242)。
它们提供了一个结构化的 API,用于对图中的单个节点和边执行直接的 CRUD 风格操作,而无需运行完整的事件入库管线。两者在构造时都接受 GraphDriver 和 EmbedderClient。
节点访问器
通过 graphiti.nodes 访问。
entity:EntityNodeNamespace(graphiti_core/namespaces/nodes.py:29-105)episode:EpisodeNodeNamespace(graphiti_core/namespaces/nodes.py:107-184)community:CommunityNodeNamespace(graphiti_core/namespaces/nodes.py:186-234)saga:SagaNodeNamespace(graphiti_core/namespaces/nodes.py:236-281)
边访问器
通过 graphiti.edges 访问。
entity:EntityEdgeNamespace(graphiti_core/namespaces/edges.py:34-111)episodic:EpisodicEdgeNamespace(graphiti_core/namespaces/edges.py:113-163)community:CommunityEdgeNamespace(graphiti_core/namespaces/edges.py:165-207)has_episode:HasEpisodeEdgeNamespace(graphiti_core/namespaces/edges.py:209-230)next_episode:NextEpisodeEdgeNamespace(graphiti_core/namespaces/edges.py:232-253)
来源:graphiti_core/graphiti.py:241-242,graphiti_core/namespaces/nodes.py:1-281,graphiti_core/namespaces/edges.py:1-253
---
token_tracker 属性
graphiti_core/graphiti.py:268-278
@property
def token_tracker(self):
return self.llm_client.token_tracker
返回大语言模型(LLM)客户端的 TokenUsageTracker,该追踪器会跟踪通过该实例进行的所有大语言模型(LLM)调用的累计 Token 使用量。追踪器上可用的方法包括 get_usage()、get_total_usage() 和 reset()。
来源:graphiti_core/graphiti.py:268-278
---
核心方法
build_indices_and_constraints
graphiti_core/graphiti.py:393-425
async def build_indices_and_constraints(delete_existing: bool = False)
委托给 self.driver.build_indices_and_constraints(delete_existing)。在首次使用前必须调用一次,以初始化数据库模式和索引。
---
close
graphiti_core/graphiti.py:311-341
async def close()
关闭底层驱动程序连接。当实例不再需要时应调用此方法。
---
retrieve_episodes
graphiti_core/graphiti.py:734-786
async def retrieve_episodes(
reference_time: datetime,
last_n: int = EPISODE_WINDOW_LEN,
group_ids: list[str] | None = None,
source: EpisodeType | None = None,
driver: GraphDriver | None = None,
saga: str | None = None,
) -> list[EpisodicNode]
检索在 reference_time 之前创建的最近 last_n 个 EpisodicNode 对象。使用 @handle_multiple_group_ids 装饰器。
来源:graphiti_core/graphiti.py:734-786
---
add_episode
graphiti_core/graphiti.py:788-1000
async def add_episode(
name, episode_body, source_description, reference_time,
source, group_id, uuid, update_communities,
entity_types, excluded_entity_types,
previous_episode_uuids, edge_types, edge_type_map,
custom_extraction_instructions, saga, saga_previous_episode_uuid
) -> AddEpisodeResults
主要的入库方法。对单个事件运行完整的提取-解析-持久化管线。
处理流程:
flowchart TD
A["Graphiti.add_episode"]
B["validate_entity_types [graphiti_core/utils/ontology_utils/entity_types_utils.py]"]
C["retrieve_episodes [graphiti_core/utils/maintenance/graph_data_operations.py]"]
D["extract_nodes [graphiti_core/utils/maintenance/node_operations.py]"]
E["resolve_extracted_nodes [graphiti_core/utils/maintenance/node_operations.py]"]
F["extract_edges [graphiti_core/utils/maintenance/edge_operations.py]"]
G["resolve_edge_pointers [graphiti_core/utils/bulk_utils.py]"]
H["resolve_extracted_edges [graphiti_core/utils/maintenance/edge_operations.py]"]
I["extract_attributes_from_nodes [graphiti_core/utils/maintenance/node_operations.py]"]
J["add_nodes_and_edges_bulk [graphiti_core/utils/bulk_utils.py]"]
K["update_community [graphiti_core/utils/maintenance/community_operations.py]"]
L["返回 AddEpisodeResults"]
A --> B --> C --> D --> E
E --> F --> G --> H
H --> I --> J --> K --> L
来源:graphiti_core/graphiti.py:788-1000,graphiti_core/utils/maintenance/edge_operations.py:88-175
---
add_episode_bulk
graphiti_core/graphiti.py:1002-1051
批量处理 RawEpisode 对象列表。这比在循环中调用 add_episode 效率高得多,因为它通过 extract_nodes_and_edges_bulk 执行批量去重和并行提取。
返回 AddBulkEpisodeResults。
来源:graphiti_core/graphiti.py:1002-1051,graphiti_core/utils/bulk_utils.py:128-149
---
build_communities
graphiti_core/graphiti.py:448-472
async def build_communities(group_ids: list[str] | None = None)
触发社区检测和摘要管线。它会移除现有社区,并使用标签传播和大语言模型(LLM)对实体集群进行摘要来重新创建社区。
来源:graphiti_core/graphiti.py:448-472,graphiti_core/utils/maintenance/community_operations.py:87
---
检索
graphiti_core/graphiti.py:657-712
search 方法委托给 graphiti_core/search/search.py 中的 search 函数,传递 GraphitiClients 容器和 SearchConfig。它返回一个 SearchResults 对象,其中包含匹配的 EntityEdge、EntityNode、EpisodicNode 和 CommunityNode 列表。
来源:graphiti_core/graphiti.py:657-712,graphiti_core/search/search.py:27-28
---
结果类型
AddEpisodeResults
graphiti_core/graphiti.py:114-121
由 add_episode 返回。包含处理过程中创建或更新的所有图对象。
| 字段 | 类型 | 描述 |
|---|---|---|
episode | EpisodicNode | 持久化的事件节点 |
episodic_edges | list[EpisodicEdge] | 将事件链接到实体节点的边 |
nodes | list[EntityNode] | 从事件中提取/解析的实体节点 |
edges | list[EntityEdge] | 提取/解析的实体边 |
communities | list[CommunityNode] | 更新的社区节点(如果请求) |
community_edges | list[CommunityEdge] | 更新的社区成员边 |
AddBulkEpisodeResults
graphiti_core/graphiti.py:123-130
与 AddEpisodeResults 具有相同的字段结构,但使用 episodes: list[EpisodicNode] 而不是单个 episode。
---
方法-代码映射图
此图将公开方法名称映射到它们调用的内部管线函数:
flowchart LR
subgraph "Graphiti 公开 API [graphiti_core/graphiti.py]"
AE["add_episode()"]
AEB["add_episode_bulk()"]
RE["retrieve_episodes()"]
BIC["build_indices_and_constraints()"]
BC["build_communities()"]
SR["search()"]
end
subgraph "节点操作 [graphiti_core/utils/maintenance/node_operations.py]"
EN["extract_nodes()"]
REN["resolve_extracted_nodes()"]
EAN["extract_attributes_from_nodes()"]
end
subgraph "边操作 [graphiti_core/utils/maintenance/edge_operations.py]"
EE["extract_edges()"]
REE["resolve_extracted_edges()"]
BEE["build_episodic_edges()"]
end
subgraph "批量工具 [graphiti_core/utils/bulk_utils.py]"
ENEB["extract_nodes_and_edges_bulk()"]
DNB["dedupe_nodes_bulk()"]
DEB["dedupe_edges_bulk()"]
ANEB["add_nodes_and_edges_bulk()"]
end
subgraph "搜索 [graphiti_core/search/search.py]"
SF["search()"]
end
subgraph "图维护 [graphiti_core/driver/operations/graph_ops.py]"
DRV["GraphMaintenanceOperations.build_indices_and_constraints()"]
end
AE --> EN
AE --> REN
AE --> EE
AE --> REE
AE --> EAN
AE --> ANEB
AE --> BEE
AEB --> ENEB
AEB --> DNB
AEB --> DEB
AEB --> ANEB
RE --> SF
SR --> SF
BIC --> DRV
来源:graphiti_core/graphiti.py:788-1051,graphiti_core/utils/bulk_utils.py:128-149,graphiti_core/utils/maintenance/edge_operations.py:88-175
---
并发与遥测
- 并发:
max_coroutines通过semaphore_gather辅助函数(graphiti_core/graphiti.py:148-151)设置并行异步操作的上限。如果未提供,则从SEMAPHORE_LIMIT环境变量读取该上限。 - 遥测:在初始化时,
Graphiti会调用_capture_initialization_telemetry()(graphiti_core/graphiti.py:247-266),该函数会发出一个包含提供者类型名称的匿名graphiti_initialized事件。可以通过GRAPHITI_TELEMETRY_ENABLED环境变量禁用此功能。
来源:graphiti_core/graphiti.py:247-266,graphiti_core/helpers.py:45