agentic_huge_data_base / wiki
页面 RAGFlow · 3.2 数据存储架构·DeepWiki 中文全文译文

3.2 · 数据存储架构(Data Storage Architecture)

复杂文档理解与引用检索 · 本章是 RAGFlow DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目RAGFlow 章节3.2 状态全文译文 模块存储与持久化、文档对象与元数据、界面与交互、系统架构
源码线索
  • api/constants.py
  • api/db/joint_services/memory_message_service.py
  • api/db/joint_services/user_account_service.py
  • api/db/services/doc_metadata_service.py
  • api/utils/configs.py
  • common/data_source/jira/connector.py
  • common/doc_store/es_conn_base.py
  • common/doc_store/infinity_conn_base.py
  • common/doc_store/ob_conn_base.py
  • common/doc_store/ob_conn_pool.py
模块标签
  • 存储与持久化
  • 文档对象与元数据
  • 界面与交互
  • 系统架构
  • 检索、召回与索引

中文译文

数据存储架构(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/infiniflow/ragflow/3.2-data-storage-architecture
翻译时间:2026-05-27T08:44:26.786Z
翻译模型:deepseek-chat
原文字符数:11995
项目:RAGFlow (ragflow)

---

数据存储架构

相关源文件

本维基页面基于以下源文件生成:

  • api/constants.py
  • api/db/joint_services/memory_message_service.py
  • api/db/joint_services/user_account_service.py
  • api/db/services/doc_metadata_service.py
  • api/utils/configs.py
  • common/data_source/jira/connector.py
  • common/doc_store/es_conn_base.py
  • common/doc_store/infinity_conn_base.py
  • common/doc_store/ob_conn_base.py
  • common/doc_store/ob_conn_pool.py
  • common/http_client.py
  • common/settings.py
  • conf/infinity_mapping.json
  • conf/mapping.json
  • memory/services/messages.py
  • memory/utils/aggregation_utils.py
  • memory/utils/es_conn.py
  • memory/utils/highlight_utils.py
  • memory/utils/infinity_conn.py
  • memory/utils/ob_conn.py
  • rag/svr/cache_file_svr.py
  • rag/utils/azure_sas_conn.py
  • rag/utils/azure_spn_conn.py
  • rag/utils/es_conn.py
  • rag/utils/infinity_conn.py
  • rag/utils/minio_conn.py
  • rag/utils/ob_conn.py
  • rag/utils/opendal_conn.py
  • rag/utils/opensearch_conn.py
  • rag/utils/oss_conn.py
  • rag/utils/s3_conn.py
  • sdk/python/test/conftest.py
  • sdk/python/test/test_frontend_api/common.py
  • sdk/python/test/test_frontend_api/test_chunk.py
  • sdk/python/test/test_frontend_api/test_dataset.py
  • test/unit_test/memory/utils/test_ob_conn_aggregation.py
  • test/unit_test/memory/utils/test_ob_conn_highlight.py
  • test/unit_test/rag/utils/test_opensearch_doc_meta.py

目的与范围

本文档描述了 RAGFlow 的多层存储架构,该架构采用多语言持久化策略来处理检索增强生成(RAG)系统的多样化数据需求。文档说明了文档存储(Elasticsearch、Infinity、OceanBase 或 OpenSearch)、关系型数据库(MySQL 或 PostgreSQL)、对象存储(MinIO、S3 或 OSS)以及协调层(Redis/Valkey)各自的作用,并详细介绍了 DocStoreConnectionStorageFactory 类提供的抽象层,同时追踪了从自然语言操作到具体代码实体的数据流。

存储层总览

RAGFlow 采用分层持久化策略,每种存储类型都根据其特定优势进行选择。这些系统通过 common/settings.py common/settings.py:81-134 中的统一服务层进行抽象,使得系统可以在不影响应用逻辑的前提下切换底层引擎(例如,将 Elasticsearch 替换为 Infinity)。

标题:存储系统层次结构与代码集成

graph TB
    subgraph "应用层(Python/Go)"
        TaskExecutor["任务执行器<br/>rag/svr/task_executor.py"]
        DocService["DocumentService<br/>api/db/services/document_service.py"]
        DocMetaSvc["DocMetadataService<br/>api/db/services/doc_metadata_service.py"]
    end

    subgraph "存储抽象层(settings.py)"
        DocStoreConn["DocStoreConnection<br/>settings.docStoreConn"]
        STORAGE_IMPL["STORAGE_IMPL<br/>settings.STORAGE_IMPL"]
        DBModels["Peewee ORM<br/>api/db/db_models.py"]
        RedisConn["REDIS_CONN<br/>rag/utils/redis_conn.py"]
    end

    subgraph "物理存储层"
        direction TB
        subgraph "文档存储(DOC_ENGINE)"
            ES["Elasticsearch<br/>rag/utils/es_conn.py<br/>ESConnection"]
            Infinity["Infinity<br/>rag/utils/infinity_conn.py<br/>InfinityConnection"]
            OB["OceanBase<br/>rag/utils/ob_conn.py<br/>OBConnection"]
            OS["OpenSearch<br/>rag/utils/opensearch_conn.py<br/>OSConnection"]
        end

        RDBMS["MySQL/PostgreSQL<br/>(元数据与认证)"]

        subgraph "对象存储"
            MinIO["MinIO/S3<br/>rag/utils/minio_conn.py<br/>RAGFlowMinio"]
            OSS["阿里云 OSS<br/>rag/utils/oss_conn.py<br/>RAGFlowOSS"]
            S3["AWS S3<br/>rag/utils/s3_conn.py<br/>RAGFlowS3"]
            OpenDAL["OpenDAL<br/>rag/utils/opendal_conn.py<br/>OpenDALStorage"]
        end

        Redis["Valkey/Redis<br/>(任务队列与缓存)"]
    end

    TaskExecutor --> DocStoreConn
    TaskExecutor --> STORAGE_IMPL

    DocService --> DBModels
    DocService --> DocStoreConn
    DocMetaSvc --> DocStoreConn

    DocStoreConn --> ES
    DocStoreConn --> Infinity
    DocStoreConn --> OB
    DocStoreConn --> OS

    STORAGE_IMPL --> MinIO
    STORAGE_IMPL --> OSS
    STORAGE_IMPL --> S3
    STORAGE_IMPL --> OpenDAL

    RedisConn --> Redis

来源: common/settings.py:81-130common/settings.py:181-196rag/utils/redis_conn.py:1-20api/db/services/doc_metadata_service.py:160-205

文档存储层

文档存储是 RAGFlow 检索能力的核心,负责管理向量嵌入、全文索引以及文档级元数据。

可插拔的文档引擎

通过 DOC_ENGINE 环境变量 common/settings.py:85-88 选择引擎。所有引擎都实现了 DocStoreConnection 接口 common/doc_store/doc_store_base.py:34-34

引擎实现类关键特性
InfinityInfinityConnection针对 RAG 进行了优化,原生支持多向量和专用分析器(rag-coarserag-fine)。采用每个知识库独立建表的隔离策略 rag/utils/infinity_conn.py:30-88rag/utils/infinity_conn.py:164-170
ElasticsearchESConnection使用 elasticsearch-dsl 实现复杂的混合查询。通过 search_after 实现深度分页 rag/utils/es_conn.py:62-141
OceanBaseOBConnection提供兼容 SQL 的向量搜索。利用 pyobvector 和 SQLAlchemy 进行模式管理 rag/utils/ob_conn.py:34-99
OpenSearchOSConnection基于 Elasticsearch 实现的分支,支持类似的混合搜索和按租户隔离的元数据索引 rag/utils/opensearch_conn.py:65-157

来源: common/settings.py:85-88rag/utils/infinity_conn.py:30-42rag/utils/es_conn.py:62-141rag/utils/ob_conn.py:34-99rag/utils/opensearch_conn.py:129-157

文档元数据管理

DocMetadataService 负责管理文档级元数据,这些元数据存储在文档存储中的专用索引中(例如 ragflow_doc_meta_{tenant_id}api/db/services/doc_metadata_service.py:36-50。该服务充当之前存储在关系型表中的元数据的权威来源 api/db/services/doc_metadata_service.py:16-21

来源: api/db/services/doc_metadata_service.py:16-51api/db/services/doc_metadata_service.py:160-205

对象存储层

对象存储处理数据的"原始"部分:上传的文件、解析过程中提取的图片以及临时制品。

存储工厂与连接器

StorageFactory common/settings.py:181-196 根据 STORAGE_IMPL 配置 common/settings.py:133-134 实例化相应的连接器。

  • MinIO/S3: RAGFlowMinioRAGFlowS3 处理兼容 S3 的存储。MinIO 支持"单桶"模式,其中 use_default_bucketuse_prefix_path 装饰器在单个物理桶内按租户/知识库组织文件 rag/utils/minio_conn.py:42-90
  • OpenDAL: OpenDALStorage 使用 OpenDAL 库提供统一接口。它甚至可以通过将 BLOB 保存到 opendal_storage 表中,将 MySQL 用作存储后端 rag/utils/opendal_conn.py:10-40rag/utils/opendal_conn.py:66-75
  • Azure/GCS/OSS: 针对 Azure Blob 存储(通过 SPN 或 SAS)、Google Cloud Storage 和阿里云 OSS 存在专门的连接器 common/settings.py:182-190

来源: common/settings.py:181-196rag/utils/minio_conn.py:42-90rag/utils/s3_conn.py:28-41rag/utils/opendal_conn.py:23-59rag/utils/azure_spn_conn.py:34-57

关系型与协调层

关系型数据库(MySQL/PostgreSQL)

存储结构化元数据,包括用户账户、租户配置、知识库设置以及文档解析状态。RAGFlow 使用 Peewee ORM 与由 DATABASE_TYPE common/settings.py:73-74common/settings.py:198-200 定义的数据库进行交互。

Redis/Valkey

充当系统的"神经系统":

  • 任务分发: 使用 Redis Streams 管理异步解析任务。队列名称根据优先级派生(例如 ragflow_svr_queueragflow_svr_queue_1common/settings.py:136-142
  • 安全与缓存: 存储系统 SECRET_KEY common/settings.py:175-179,并通过 REDIS_CONN rag/utils/redis_conn.py:1-20 提供通用缓存功能。

来源: common/settings.py:73-74common/settings.py:136-142common/settings.py:175-179rag/utils/redis_conn.py:1-20

数据流图

自然语言到代码实体的映射

标题:检索操作到代码和存储的映射

graph LR
    subgraph "自然语言空间"
        UserQuery["'查找第三季度收入'"]
        DocUpload["'上传 annual_report.pdf'"]
    end

    subgraph "代码实体空间"
        SearchDealer["search.py<br/>rag.nlp.search"]
        DocMetaSvc["DocMetadataService.search_metadata()<br/>api/db/services/doc_metadata_service.py"]
        StorageImpl["OpenDALStorage/RAGFlowMinio<br/>rag/utils/*.py"]
    end

    subgraph "存储空间"
        DocStore["Infinity/ES/OB/OS<br/>(片段与向量)"]
        RDBMS["MySQL/PostgreSQL<br/>(知识库配置)"]
        ObjectStore["MinIO/S3/OSS<br/>(原始二进制数据)"]
    end

    UserQuery --> SearchDealer
    SearchDealer --> DocStore

    DocUpload --> DocMetaSvc
    DocMetaSvc --> DocStore
    DocUpload --> StorageImpl
    StorageImpl --> ObjectStore

来源: common/settings.py:42-42api/db/services/doc_metadata_service.py:160-205rag/utils/minio_conn.py:146-157rag/utils/ob_conn.py:43-95

存储连接与健康检查

标题:系统健康监控流程

sequenceDiagram
    participant App as system_app.py
    participant Settings as common/settings.py
    participant DocStore as Infinity/ES/OB/OS 连接
    participant Storage as MinIO/S3/OpenDAL

    App->>Settings: 获取 docStoreConn
    App->>DocStore: health()
    Note over DocStore: 检查集群/节点状态
    DocStore-->>App: 状态(正常/错误)

    App->>Settings: 获取 STORAGE_IMPL
    App->>Storage: health()
    Note over Storage: 执行虚拟写入/读取
    Storage-->>App: 状态(成功/失败)

来源: rag/utils/minio_conn.py:120-142rag/utils/opendal_conn.py:77-79rag/utils/s3_conn.py:115-125rag/utils/azure_spn_conn.py:65-70