数据存储架构(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/infiniflow/ragflow/3.2-data-storage-architecture
翻译时间:2026-05-27T08:44:26.786Z
翻译模型:deepseek-chat
原文字符数:11995
项目:RAGFlow (ragflow)
---
数据存储架构
相关源文件
本维基页面基于以下源文件生成:
api/constants.pyapi/db/joint_services/memory_message_service.pyapi/db/joint_services/user_account_service.pyapi/db/services/doc_metadata_service.pyapi/utils/configs.pycommon/data_source/jira/connector.pycommon/doc_store/es_conn_base.pycommon/doc_store/infinity_conn_base.pycommon/doc_store/ob_conn_base.pycommon/doc_store/ob_conn_pool.pycommon/http_client.pycommon/settings.pyconf/infinity_mapping.jsonconf/mapping.jsonmemory/services/messages.pymemory/utils/aggregation_utils.pymemory/utils/es_conn.pymemory/utils/highlight_utils.pymemory/utils/infinity_conn.pymemory/utils/ob_conn.pyrag/svr/cache_file_svr.pyrag/utils/azure_sas_conn.pyrag/utils/azure_spn_conn.pyrag/utils/es_conn.pyrag/utils/infinity_conn.pyrag/utils/minio_conn.pyrag/utils/ob_conn.pyrag/utils/opendal_conn.pyrag/utils/opensearch_conn.pyrag/utils/oss_conn.pyrag/utils/s3_conn.pysdk/python/test/conftest.pysdk/python/test/test_frontend_api/common.pysdk/python/test/test_frontend_api/test_chunk.pysdk/python/test/test_frontend_api/test_dataset.pytest/unit_test/memory/utils/test_ob_conn_aggregation.pytest/unit_test/memory/utils/test_ob_conn_highlight.pytest/unit_test/rag/utils/test_opensearch_doc_meta.py
目的与范围
本文档描述了 RAGFlow 的多层存储架构,该架构采用多语言持久化策略来处理检索增强生成(RAG)系统的多样化数据需求。文档说明了文档存储(Elasticsearch、Infinity、OceanBase 或 OpenSearch)、关系型数据库(MySQL 或 PostgreSQL)、对象存储(MinIO、S3 或 OSS)以及协调层(Redis/Valkey)各自的作用,并详细介绍了 DocStoreConnection 和 StorageFactory 类提供的抽象层,同时追踪了从自然语言操作到具体代码实体的数据流。
存储层总览
RAGFlow 采用分层持久化策略,每种存储类型都根据其特定优势进行选择。这些系统通过 common/settings.py common/settings.py:81-134 中的统一服务层进行抽象,使得系统可以在不影响应用逻辑的前提下切换底层引擎(例如,将 Elasticsearch 替换为 Infinity)。
标题:存储系统层次结构与代码集成
graph TB
subgraph "应用层(Python/Go)"
TaskExecutor["任务执行器<br/>rag/svr/task_executor.py"]
DocService["DocumentService<br/>api/db/services/document_service.py"]
DocMetaSvc["DocMetadataService<br/>api/db/services/doc_metadata_service.py"]
end
subgraph "存储抽象层(settings.py)"
DocStoreConn["DocStoreConnection<br/>settings.docStoreConn"]
STORAGE_IMPL["STORAGE_IMPL<br/>settings.STORAGE_IMPL"]
DBModels["Peewee ORM<br/>api/db/db_models.py"]
RedisConn["REDIS_CONN<br/>rag/utils/redis_conn.py"]
end
subgraph "物理存储层"
direction TB
subgraph "文档存储(DOC_ENGINE)"
ES["Elasticsearch<br/>rag/utils/es_conn.py<br/>ESConnection"]
Infinity["Infinity<br/>rag/utils/infinity_conn.py<br/>InfinityConnection"]
OB["OceanBase<br/>rag/utils/ob_conn.py<br/>OBConnection"]
OS["OpenSearch<br/>rag/utils/opensearch_conn.py<br/>OSConnection"]
end
RDBMS["MySQL/PostgreSQL<br/>(元数据与认证)"]
subgraph "对象存储"
MinIO["MinIO/S3<br/>rag/utils/minio_conn.py<br/>RAGFlowMinio"]
OSS["阿里云 OSS<br/>rag/utils/oss_conn.py<br/>RAGFlowOSS"]
S3["AWS S3<br/>rag/utils/s3_conn.py<br/>RAGFlowS3"]
OpenDAL["OpenDAL<br/>rag/utils/opendal_conn.py<br/>OpenDALStorage"]
end
Redis["Valkey/Redis<br/>(任务队列与缓存)"]
end
TaskExecutor --> DocStoreConn
TaskExecutor --> STORAGE_IMPL
DocService --> DBModels
DocService --> DocStoreConn
DocMetaSvc --> DocStoreConn
DocStoreConn --> ES
DocStoreConn --> Infinity
DocStoreConn --> OB
DocStoreConn --> OS
STORAGE_IMPL --> MinIO
STORAGE_IMPL --> OSS
STORAGE_IMPL --> S3
STORAGE_IMPL --> OpenDAL
RedisConn --> Redis
来源: common/settings.py:81-130、common/settings.py:181-196、rag/utils/redis_conn.py:1-20、api/db/services/doc_metadata_service.py:160-205
文档存储层
文档存储是 RAGFlow 检索能力的核心,负责管理向量嵌入、全文索引以及文档级元数据。
可插拔的文档引擎
通过 DOC_ENGINE 环境变量 common/settings.py:85-88 选择引擎。所有引擎都实现了 DocStoreConnection 接口 common/doc_store/doc_store_base.py:34-34。
| 引擎 | 实现类 | 关键特性 |
|---|---|---|
| Infinity | InfinityConnection | 针对 RAG 进行了优化,原生支持多向量和专用分析器(rag-coarse、rag-fine)。采用每个知识库独立建表的隔离策略 rag/utils/infinity_conn.py:30-88、rag/utils/infinity_conn.py:164-170。 |
| Elasticsearch | ESConnection | 使用 elasticsearch-dsl 实现复杂的混合查询。通过 search_after 实现深度分页 rag/utils/es_conn.py:62-141。 |
| OceanBase | OBConnection | 提供兼容 SQL 的向量搜索。利用 pyobvector 和 SQLAlchemy 进行模式管理 rag/utils/ob_conn.py:34-99。 |
| OpenSearch | OSConnection | 基于 Elasticsearch 实现的分支,支持类似的混合搜索和按租户隔离的元数据索引 rag/utils/opensearch_conn.py:65-157。 |
来源: common/settings.py:85-88、rag/utils/infinity_conn.py:30-42、rag/utils/es_conn.py:62-141、rag/utils/ob_conn.py:34-99、rag/utils/opensearch_conn.py:129-157
文档元数据管理
DocMetadataService 负责管理文档级元数据,这些元数据存储在文档存储中的专用索引中(例如 ragflow_doc_meta_{tenant_id})api/db/services/doc_metadata_service.py:36-50。该服务充当之前存储在关系型表中的元数据的权威来源 api/db/services/doc_metadata_service.py:16-21。
来源: api/db/services/doc_metadata_service.py:16-51、api/db/services/doc_metadata_service.py:160-205
对象存储层
对象存储处理数据的"原始"部分:上传的文件、解析过程中提取的图片以及临时制品。
存储工厂与连接器
StorageFactory common/settings.py:181-196 根据 STORAGE_IMPL 配置 common/settings.py:133-134 实例化相应的连接器。
- MinIO/S3:
RAGFlowMinio和RAGFlowS3处理兼容 S3 的存储。MinIO 支持"单桶"模式,其中use_default_bucket和use_prefix_path装饰器在单个物理桶内按租户/知识库组织文件rag/utils/minio_conn.py:42-90。 - OpenDAL:
OpenDALStorage使用 OpenDAL 库提供统一接口。它甚至可以通过将 BLOB 保存到opendal_storage表中,将 MySQL 用作存储后端rag/utils/opendal_conn.py:10-40、rag/utils/opendal_conn.py:66-75。 - Azure/GCS/OSS: 针对 Azure Blob 存储(通过 SPN 或 SAS)、Google Cloud Storage 和阿里云 OSS 存在专门的连接器
common/settings.py:182-190。
来源: common/settings.py:181-196、rag/utils/minio_conn.py:42-90、rag/utils/s3_conn.py:28-41、rag/utils/opendal_conn.py:23-59、rag/utils/azure_spn_conn.py:34-57
关系型与协调层
关系型数据库(MySQL/PostgreSQL)
存储结构化元数据,包括用户账户、租户配置、知识库设置以及文档解析状态。RAGFlow 使用 Peewee ORM 与由 DATABASE_TYPE common/settings.py:73-74、common/settings.py:198-200 定义的数据库进行交互。
Redis/Valkey
充当系统的"神经系统":
- 任务分发: 使用 Redis Streams 管理异步解析任务。队列名称根据优先级派生(例如
ragflow_svr_queue和ragflow_svr_queue_1)common/settings.py:136-142。 - 安全与缓存: 存储系统
SECRET_KEYcommon/settings.py:175-179,并通过REDIS_CONNrag/utils/redis_conn.py:1-20提供通用缓存功能。
来源: common/settings.py:73-74、common/settings.py:136-142、common/settings.py:175-179、rag/utils/redis_conn.py:1-20
数据流图
自然语言到代码实体的映射
标题:检索操作到代码和存储的映射
graph LR
subgraph "自然语言空间"
UserQuery["'查找第三季度收入'"]
DocUpload["'上传 annual_report.pdf'"]
end
subgraph "代码实体空间"
SearchDealer["search.py<br/>rag.nlp.search"]
DocMetaSvc["DocMetadataService.search_metadata()<br/>api/db/services/doc_metadata_service.py"]
StorageImpl["OpenDALStorage/RAGFlowMinio<br/>rag/utils/*.py"]
end
subgraph "存储空间"
DocStore["Infinity/ES/OB/OS<br/>(片段与向量)"]
RDBMS["MySQL/PostgreSQL<br/>(知识库配置)"]
ObjectStore["MinIO/S3/OSS<br/>(原始二进制数据)"]
end
UserQuery --> SearchDealer
SearchDealer --> DocStore
DocUpload --> DocMetaSvc
DocMetaSvc --> DocStore
DocUpload --> StorageImpl
StorageImpl --> ObjectStore
来源: common/settings.py:42-42、api/db/services/doc_metadata_service.py:160-205、rag/utils/minio_conn.py:146-157、rag/utils/ob_conn.py:43-95
存储连接与健康检查
标题:系统健康监控流程
sequenceDiagram
participant App as system_app.py
participant Settings as common/settings.py
participant DocStore as Infinity/ES/OB/OS 连接
participant Storage as MinIO/S3/OpenDAL
App->>Settings: 获取 docStoreConn
App->>DocStore: health()
Note over DocStore: 检查集群/节点状态
DocStore-->>App: 状态(正常/错误)
App->>Settings: 获取 STORAGE_IMPL
App->>Storage: health()
Note over Storage: 执行虚拟写入/读取
Storage-->>App: 状态(成功/失败)
来源: rag/utils/minio_conn.py:120-142、rag/utils/opendal_conn.py:77-79、rag/utils/s3_conn.py:115-125、rag/utils/azure_spn_conn.py:65-70