agentic_huge_data_base / wiki
页面 RAGFlow · 9.1 文档 Store 抽象·DeepWiki 中文全文译文

9.1 · 文档 Store 抽象(Document Store Abstraction)

复杂文档理解与引用检索 · 本章是 RAGFlow DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目RAGFlow 章节9.1 状态全文译文 模块检索、召回与索引、系统架构、文档对象与元数据、存储与持久化
源码线索
  • api/db/joint_services/memory_message_service.py
  • api/db/joint_services/user_account_service.py
  • api/db/services/doc_metadata_service.py
  • common/doc_store/es_conn_base.py
  • common/doc_store/infinity_conn_base.py
  • common/doc_store/ob_conn_base.py
  • common/doc_store/ob_conn_pool.py
  • common/settings.py
  • conf/infinity_mapping.json
  • conf/mapping.json
模块标签
  • 检索、召回与索引
  • 系统架构
  • 文档对象与元数据
  • 存储与持久化
  • 界面与交互

中文译文

文档 Store 抽象(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/infiniflow/ragflow/9.1-document-store-abstraction
翻译时间:2026-05-27T08:44:50.940Z
翻译模型:deepseek-chat
原文字符数:16126
项目:RAGFlow (ragflow)

---

文档存储抽象层

相关源文件

以下文件作为生成此 Wiki 页面的上下文:

  • api/db/joint_services/memory_message_service.py
  • api/db/joint_services/user_account_service.py
  • api/db/services/doc_metadata_service.py
  • common/doc_store/es_conn_base.py
  • common/doc_store/infinity_conn_base.py
  • common/doc_store/ob_conn_base.py
  • common/doc_store/ob_conn_pool.py
  • common/settings.py
  • conf/infinity_mapping.json
  • conf/mapping.json
  • memory/services/messages.py
  • memory/utils/aggregation_utils.py
  • memory/utils/es_conn.py
  • memory/utils/highlight_utils.py
  • memory/utils/infinity_conn.py
  • memory/utils/ob_conn.py
  • rag/utils/es_conn.py
  • rag/utils/infinity_conn.py
  • rag/utils/ob_conn.py
  • rag/utils/opensearch_conn.py
  • test/unit_test/memory/utils/test_ob_conn_aggregation.py
  • test/unit_test/memory/utils/test_ob_conn_highlight.py
  • test/unit_test/rag/utils/test_opensearch_doc_meta.py

目的与范围

本文档描述了 RAGFlow 的文档存储抽象层,该层提供了一个统一接口,用于与多种搜索引擎后端进行交互。该抽象层使 RAGFlow 能够支持 Elasticsearch、Infinity、OceanBase 和 OpenSearch 作为可插拔的存储引擎,而无需修改应用程序代码。该层负责处理片段存储、向量索引、全文搜索和混合检索操作。

该抽象层确保检索增强生成(RAG)管线与底层存储技术无关,无论底层是传统搜索引擎(Elasticsearch)、高性能 AI 原生数据库(Infinity),还是具备向量能力的分布式关系型数据库(OceanBase)。

---

架构总览

文档存储抽象层采用三层架构:抽象接口、后端特定基类,以及通过环境配置在运行时选择的具体实现。

图:文档存储抽象层

graph TB
    subgraph "应用层"
        ["KnowledgebaseService"]
        ["DocMetadataService"]
        ["检索"]
    end

    subgraph "抽象接口"
        ["DocStoreConnection<br/>common/doc_store/doc_store_base.py"]
        ["匹配表达式类型:<br/>MatchTextExpr<br/>MatchDenseExpr<br/>FusionExpr"]
    end

    subgraph "后端基类"
        ["ESConnectionBase<br/>common/doc_store/es_conn_base.py"]
        ["InfinityConnectionBase<br/>common/doc_store/infinity_conn_base.py"]
        ["OBConnectionBase<br/>common/doc_store/ob_conn_base.py"]
    end

    subgraph "具体实现"
        ["ESConnection<br/>rag/utils/es_conn.py"]
        ["InfinityConnection<br/>rag/utils/infinity_conn.py"]
        ["OBConnection<br/>rag/utils/ob_conn.py"]
        ["OSConnection<br/>rag/utils/opensearch_conn.py"]
    end

    subgraph "存储后端"
        ["Elasticsearch 8.x"]
        ["Infinity 0.7+"]
        ["OceanBase / OBVector"]
        ["OpenSearch 2.x"]
    end

    subgraph "配置"
        ["DOC_ENGINE<br/>环境变量"]
        ["common/settings.py<br/>docStoreConn"]
    end

    ["KnowledgebaseService"] --> ["DocStoreConnection<br/>common/doc_store/doc_store_base.py"]
    ["DocMetadataService"] --> ["DocStoreConnection<br/>common/doc_store/doc_store_base.py"]
    ["检索"] --> ["DocStoreConnection<br/>common/doc_store/doc_store_base.py"]

    ["DocStoreConnection<br/>common/doc_store/doc_store_base.py"] --> ["ESConnectionBase<br/>common/doc_store/es_conn_base.py"]
    ["DocStoreConnection<br/>common/doc_store/doc_store_base.py"] --> ["InfinityConnectionBase<br/>common/doc_store/infinity_conn_base.py"]
    ["DocStoreConnection<br/>common/doc_store/doc_store_base.py"] --> ["OBConnectionBase<br/>common/doc_store/ob_conn_base.py"]

    ["ESConnectionBase<br/>common/doc_store/es_conn_base.py"] --> ["ESConnection<br/>rag/utils/es_conn.py"]
    ["InfinityConnectionBase<br/>common/doc_store/infinity_conn_base.py"] --> ["InfinityConnection<br/>rag/utils/infinity_conn.py"]
    ["OBConnectionBase<br/>common/doc_store/ob_conn_base.py"] --> ["OBConnection<br/>rag/utils/ob_conn.py"]

    ["ESConnection<br/>rag/utils/es_conn.py"] --> ["Elasticsearch 8.x"]
    ["InfinityConnection<br/>rag/utils/infinity_conn.py"] --> ["Infinity 0.7+"]
    ["OBConnection<br/>rag/utils/ob_conn.py"] --> ["OceanBase / OBVector"]
    ["OSConnection<br/>rag/utils/opensearch_conn.py"] --> ["OpenSearch 2.x"]

    ["DOC_ENGINE<br/>环境变量"] --> ["common/settings.py<br/>docStoreConn"]
    ["common/settings.py<br/>docStoreConn"] --> ["ESConnection<br/>rag/utils/es_conn.py"]
    ["common/settings.py<br/>docStoreConn"] --> ["InfinityConnection<br/>rag/utils/infinity_conn.py"]
    ["common/settings.py<br/>docStoreConn"] --> ["OBConnection<br/>rag/utils/ob_conn.py"]
    ["common/settings.py<br/>docStoreConn"] --> ["OSConnection<br/>rag/utils/opensearch_conn.py"]

来源:common/doc_store/doc_store_base.py:34-34, common/settings.py:85-91, rag/utils/infinity_conn.py:29-30, rag/utils/es_conn.py:61-62, rag/utils/ob_conn.py:33-34, rag/utils/opensearch_conn.py:65-65

---

DocStoreConnection 接口

DocStoreConnection 抽象基类定义了所有后端实现必须满足的契约。它按功能类别组织,包括索引管理、增删改查(CRUD)和检索操作。

数据库与索引操作
方法参数用途
db_type()返回后端类型标识符("elasticsearch""infinity""oceanbase""opensearch"
health()返回集群健康状态,包括可用性和资源指标
create_idx()indexName, knowledgebaseId, vectorSize, parser_id创建具有指定向量维度模式的索引/表
delete_idx()indexName, knowledgebaseId删除索引/表
index_exist()indexName, knowledgebaseId检查索引/表是否存在
create_doc_meta_idx()index_name创建用于文档元数据的专用索引

来源:common/doc_store/doc_store_base.py:143-185, rag/utils/opensearch_conn.py:107-157

增删改查(CRUD)与搜索操作

search() 方法是检索的主要入口点,支持混合搜索、过滤和聚合。

@abstractmethod
def search(
    self,
    select_fields: list[str],
    highlight_fields: list[str],
    condition: dict,
    match_expressions: list[MatchExpr],
    order_by: OrderByExpr,
    offset: int,
    limit: int,
    index_names: str | list[str],
    knowledgebase_ids: list[str],
    agg_fields: list[str] | None = None,
    rank_feature: dict | None = None
):

关键参数:

  • condition:过滤条件的字典(例如 {"available_int": 1})。
  • match_expressions:要执行的 MatchTextExpr(BM25)、MatchDenseExpr(向量)或 FusionExpr 列表。
  • knowledgebase_ids:用于多租户分区或表选择。

来源:common/doc_store/doc_store_base.py:191-236, rag/utils/infinity_conn.py:94-107

---

后端实现

Infinity 实现

Infinity 对片段存储采用每个知识库一张表的策略。它执行大量的字段映射,以将 RAGFlow 的逻辑字段与物理数据库列对齐。

字段转换逻辑: Infinity 将 RAGFlow 的逻辑字段映射到 conf/infinity_mapping.json 中定义的特定列和全文索引。

RAGFlow 字段Infinity 列全文索引
docnm_kwddocnmft_docnm_rag_coarse
content_ltkscontentft_content_rag_coarse
important_kwdimportant_keywordsft_important_keywords_rag_coarse

来源:rag/utils/infinity_conn.py:61-88, conf/infinity_mapping.json:9-17

实现细节:

  • 表分区: 片段存储在名为 {index_name}_{kb_id} 的表中 rag/utils/infinity_conn.py:164-165
  • 关键字处理: Infinity 将 *_kwd 字段视为关键字列表,但 knowledge_graph_kwd 等特定排除项除外 rag/utils/infinity_conn.py:38-42
  • 元数据争用: 实现了 _retry_on_meta_contention,采用指数退避和抖动策略,以处理索引创建或删除期间 RocksDB 的"资源繁忙"错误(代码 9003)common/doc_store/infinity_conn_base.py:106-145
Elasticsearch 实现

Elasticsearch 对每个租户使用单个索引,使用 kb_id 作为过滤字段进行隔离。

关键特性:

  • 混合搜索: 结合 query_string 进行文本匹配,使用 knn 进行向量相似度搜索 rag/utils/es_conn.py:183-205
  • Search After: 实现了 _search_with_search_after,用于在 MAX_RESULT_WINDOW(10,000)之外进行高效深度分页 rag/utils/es_conn.py:76-140
  • PageRank 调整: 使用自定义脚本 _PAGERANK_FEA_ADJUST_SCRIPT,在片段反馈期间对 pagerank_fea 进行原子更新 rag/utils/es_conn.py:36-58

来源:rag/utils/es_conn.py:31-154

OceanBase 实现

OceanBase 实现使用 pyobvector 和 SQLAlchemy,在关系型后端之上提供向量搜索能力。

模式定义: OceanBase 实现在 rag/utils/ob_conn.py 中定义了严格的模式,包含片段元数据、内容和向量数据的列。

  • 向量存储: 通过 pyobvector 使用 ARRAY 类型 rag/utils/ob_conn.py:26-30
  • 全文列: content_with_weightcontent_ltkstitle_tks 用于 BM25 搜索 rag/utils/ob_conn.py:116-131
  • 列定义: 诸如 available_intknowledge_graph_kwdpagerank_fea 等特定列使用 SQLAlchemy 的 Column 对象显式定义 rag/utils/ob_conn.py:52-99

来源:rag/utils/ob_conn.py:52-99, rag/utils/ob_conn.py:116-131

OpenSearch 实现

OpenSearch 实现为 OpenSearch 2.x 后端提供了统一接口,镜像了 Elasticsearch 的抽象,但使用了 opensearchpy 客户端。

关键特性:

  • 元数据索引: 支持使用 conf/doc_meta_es_mapping.json 中的专用映射创建 create_doc_meta_idx rag/utils/opensearch_conn.py:129-156
  • 工具函数: 提供 refresh_idxcount_idx 用于索引管理和监控 rag/utils/opensearch_conn.py:158-191

来源:rag/utils/opensearch_conn.py:65-102, rag/utils/opensearch_conn.py:129-191

---

匹配表达式系统

该系统使用统一的表达式语言来描述跨不同后端的搜索意图。

图:自然语言到代码实体空间

graph LR
    subgraph "自然语言意图"
        ["'查找关于大语言模型(LLM)的片段'"]
        ["'查找与此向量相似的片段'"]
        ["'结合文本和向量搜索'"]
    end

    subgraph "代码实体空间(common/doc_store/doc_store_base.py)"
        ["MatchTextExpr<br/>fields=['content_ltks']<br/>matching_text='LLMs'"]
        ["MatchDenseExpr<br/>vector_column_name='q_768_vec'<br/>embedding_data=[...]"]
        ["FusionExpr<br/>method='weighted_sum'<br/>fusion_params={'weights': '0.5,0.5'}"]
    end

    ["'查找关于大语言模型(LLM)的片段'"] --> ["MatchTextExpr<br/>fields=['content_ltks']<br/>matching_text='LLMs'"]
    ["'查找与此向量相似的片段'"] --> ["MatchDenseExpr<br/>vector_column_name='q_768_vec'<br/>embedding_data=[...]"]
    ["'结合文本和向量搜索'"] --> ["FusionExpr<br/>method='weighted_sum'<br/>fusion_params={'weights': '0.5,0.5'}"]

来源:common/doc_store/doc_store_base.py:56-127

MatchTextExpr

用于关键字和语义文本匹配。它包含 matching_textfieldsextra_options 等参数,用于控制 minimum_should_match common/doc_store/doc_store_base.py:56-68, memory/utils/es_conn.py:170-172

MatchDenseExpr

用于向量相似度搜索。它指定目标向量列(例如 q_768_vec)和查询嵌入向量 common/doc_store/doc_store_base.py:70-86

FusionExpr

启用混合搜索。Infinity 和 Elasticsearch 等后端使用 FusionExpr 中的参数来确定如何合并来自多个搜索路径的结果 rag/utils/infinity_conn.py:128-138, rag/utils/es_conn.py:185-190

---

文档与记忆存储

RAGFlow 在文档存储中维护文档级元数据和对话记忆。

文档元数据

DocMetadataService 管理,文档元数据存储在名为 ragflow_doc_meta_{tenant_id} 的索引中 api/db/services/doc_metadata_service.py:40-50。该存储作为文档级属性的单一事实来源。

  • 搜索迭代: DocMetadataService._iter_search_results 处理不同后端的不同输出格式(DataFrame、ES 字典、OceanBase SearchResult)api/db/services/doc_metadata_service.py:103-158
记忆系统

对话记忆使用每个用户的索引存储在文档存储中。

  • 索引名称: memory_{uid}index_name(uid) 生成 memory/services/messages.py:24-24
  • 消息类型: 支持 RAW 对话存储和提取的语义记忆 api/db/joint_services/memory_message_service.py:64-90
  • MsgStoreConn: 文档存储抽象的专用实例(settings.msgStoreConn)用于记忆操作 common/settings.py:91-91
  • 字段映射: map_message_to_es_fields 将消息字典转换为存储模式,包括名为 q_{dim}_vec 的向量嵌入 memory/utils/es_conn.py:53-78

来源:memory/services/messages.py:24-24, api/db/joint_services/memory_message_service.py:64-90, common/settings.py:91-91, memory/utils/es_conn.py:53-78

---

引擎选择与初始化

存储引擎的选择在 common/settings.py 的系统初始化期间确定。

图:系统初始化流程

sequenceDiagram
    participant OS as 操作系统环境
    participant Settings as common/settings.py
    participant Factory as docStoreConn

    OS->>Settings: 设置 DOC_ENGINE(例如 'infinity')
    Settings->>Settings: init_settings()

    alt DOC_ENGINE == 'elasticsearch'
        Settings->>Factory: 实例化 ESConnection
    else DOC_ENGINE == 'infinity'
        Settings->>Factory: 实例化 InfinityConnection
    else DOC_ENGINE == 'oceanbase'
        Settings->>Factory: 实例化 OBConnection
    else DOC_ENGINE == 'opensearch'
        Settings->>Factory: 实例化 OSConnection
    end

    Settings->>Factory: health() 检查

来源:common/settings.py:85-91, common/settings.py:246-260

存储实现映射
引擎类名文件路径
ElasticsearchESConnectionrag/utils/es_conn.py
InfinityInfinityConnectionrag/utils/infinity_conn.py
OceanBaseOBConnectionrag/utils/ob_conn.py
OpenSearchOSConnectionrag/utils/opensearch_conn.py

来源:common/settings.py:29-32, common/settings.py:246-260, rag/utils/opensearch_conn.py:65-65