文档处理管线(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/infiniflow/ragflow/6-document-processing-pipeline
翻译时间:2026-05-27T08:44:43.302Z
翻译模型:deepseek-chat
原文字符数:20397
项目:RAGFlow (ragflow)
---
文档处理管线
相关源文件
以下文件被用作生成此 Wiki 页面的上下文:
api/db/__init__.pyapi/db/db_models.pyapi/db/services/dialog_service.pyapi/db/services/document_service.pyapi/db/services/file_service.pyapi/db/services/knowledgebase_service.pyapi/db/services/llm_service.pyapi/db/services/task_service.pyapi/db/services/user_service.pydeepdoc/parser/excel_parser.pyrag/app/book.pyrag/app/laws.pyrag/app/manual.pyrag/app/naive.pyrag/app/one.pyrag/app/paper.pyrag/app/presentation.pyrag/app/qa.pyrag/app/table.pyrag/nlp/__init__.pyrag/nlp/search.pyrag/raptor.pyrag/svr/task_executor.py
目的与范围
文档处理管线是 RAGFlow 的核心引擎,负责将上传的原始文档转换为语义丰富、经过片段切分、嵌入向量化并建立索引的单元,以供检索使用。其流程涵盖从文档上传和入库,到解析、片段切分、语义增强、嵌入向量化,最终在文档存储中进行索引的全过程。
本文档是对端到端管线结构和流程的高层概述,并链接到每个核心阶段的详细子页面:解析、片段切分、增强、视觉处理(OCR/版面分析)、高级功能(GraphRAG、RAPTOR)以及外部连接器集成。
有关详细的机制和实现,请参见以下链接的子页面:
---
管线架构总览
高层管线流程
文档处理管线以异步方式运行,使用任务队列和后台工作者。当通过 API 上传文档时,会创建一个处理任务并将其加入 Redis Streams 队列。后台任务执行器工作者从队列中获取任务,选择合适的解析器,对文档进行片段切分,通过大语言模型(LLM)应用语义增强,将片段转换为嵌入向量,并在文档存储中建立索引。
flowchart TB
subgraph "上传与任务创建"
Upload["文档上传<br/>POST /upload"]
TaskCreate["任务创建<br/>TaskService.get_task"]
RedisQueue["Redis Streams<br/>SVR 消费者组队列"]
Upload --> TaskCreate
TaskCreate --> RedisQueue
end
subgraph "任务执行器"
Collect["任务收集<br/>task_executor.collect()"]
BuildChunks["片段构建<br/>task_executor.build_chunks()"]
RedisQueue --> Collect
Collect --> BuildChunks
end
subgraph "解析器选择"
Factory["FACTORY 字典<br/>ParserType → 解析器模块"]
ParserExec["解析器执行<br/>chunk() 函数"]
BuildChunks --> Factory
Factory --> ParserExec
end
subgraph "增强"
Keywords["关键词提取<br/>keyword_extraction()"]
Questions["问题生成<br/>question_proposal()"]
Metadata["元数据提取<br/>gen_metadata()"]
Tags["内容标签<br/>content_tagging()"]
ParserExec --> Keywords
ParserExec --> Questions
ParserExec --> Metadata
ParserExec --> Tags
end
subgraph "嵌入向量与索引"
Embed["文本嵌入向量<br/>LLMBundle.encode()"]
Index["索引片段<br/>Dealer.search / DocStore"]
Keywords --> Embed
Questions --> Embed
Metadata --> Embed
Tags --> Embed
Embed --> Index
end
subgraph "存储"
DocStore[("文档存储<br/>Dealer 和 DocStoreConnection")]
MinIO[("MinIO 对象存储")]
MySQL[("元数据库<br/>MySQL/PostgreSQL")]
Index --> DocStore
ParserExec --> MinIO
Index --> MySQL
end
来源:rag/svr/task_executor.py:85-110, rag/svr/task_executor.py:177-240, api/db/services/task_service.py:75-142, api/db/services/document_service.py:44-72, rag/nlp/search.py:37-51
---
任务执行器与队列系统
任务收集与分发
处理任务由从 task_executor.py 实例化的后台工作者管理。这些工作者从 Redis Streams 队列中消费文档处理任务,该队列使用由 SVR_CONSUMER_GROUP_NAME 和 task_executor_* 消费者名称标识的消费者组。通过 asyncio 信号量控制并发度,限制最大并行任务数和片段构建数。
flowchart LR
subgraph "Redis Streams"
Queue["REDIS_CONN Streams<br/>get_unacked_iterator()"]
end
subgraph "消费者组"
CG["SVR_CONSUMER_GROUP_NAME<br/>任务消费者组"]
end
subgraph "任务执行器"
TE0["task_executor_0<br/>消费者实例"]
TE1["task_executor_1<br/>消费者实例"]
end
subgraph "并发控制"
TaskLimiter["task_limiter 信号量<br/>MAX_CONCURRENT_TASKS"]
ChunkLimiter["chunk_limiter 信号量<br/>MAX_CONCURRENT_CHUNK_BUILDERS"]
EmbedLimiter["embed_limiter 信号量<br/>MAX_CONCURRENT_CHUNK_BUILDERS"]
end
Queue --> CG
CG --> TE0
CG --> TE1
TE0 --> TaskLimiter
TE1 --> TaskLimiter
TaskLimiter --> ChunkLimiter
TaskLimiter --> EmbedLimiter
多个任务执行器并行运行,各自消费任务,通过信号量控制以避免过载。
任务类型与路由
管线支持多种任务类型,路由到不同的逻辑处理器,定义在 TASK_TYPE_TO_PIPELINE_TASK_TYPE 中:
| 任务类型 | 管线任务类型 | 处理逻辑 | 描述 |
|---|---|---|---|
dataflow | PARSE | 标准解析流程(run_dataflow()) | 通过解析器模块进行典型文档解析 |
raptor | RAPTOR | RAPTOR 层次聚类 | 用于检索的层次化片段聚类 |
graphrag | GRAPH_RAG | GraphRAG 知识图谱提取 | 实体-关系图谱提取 |
mindmap | MINDMAP | 思维导图生成(MindMapExtractor) | 提取思维导图结构 |
memory | MEMORY | 智能体记忆持久化(handle_save_to_memory_task) | 持久化智能体记忆管理 |
这种路由支持高级文档处理工作流的可扩展性。
来源:rag/svr/task_executor.py:104-110, rag/svr/task_executor.py:33-46, rag/raptor.py:1-50
---
解析器选择与工厂模式
解析器工厂映射
解析器模块为各种文档类型实现了片段切分策略。FACTORY 字典将解析器类型常量(ParserType)映射到对应的解析器模块,从而在运行时动态选择解析逻辑。
graph TB
subgraph "解析器工厂映射"
FACTORY["FACTORY 字典:<br/>ParserType => 模块"]
end
subgraph "解析器模块"
naive["rag.app.naive"]
paper["rag.app.paper"]
book["rag.app.book"]
presentation["rag.app.presentation"]
manual["rag.app.manual"]
laws["rag.app.laws"]
qa["rag.app.qa"]
table["rag.app.table"]
resume["rag.app.resume"]
picture["rag.app.picture"]
one["rag.app.one"]
audio["rag.app.audio"]
email["rag.app.email"]
tag["rag.app.tag"]
end
FACTORY --> naive
FACTORY --> paper
FACTORY --> book
FACTORY --> presentation
FACTORY --> manual
FACTORY --> laws
FACTORY --> qa
FACTORY --> table
FACTORY --> resume
FACTORY --> picture
FACTORY --> one
FACTORY --> audio
FACTORY --> email
FACTORY --> tag
这种工厂模式支持独立扩展或定制解析策略。
解析器选择逻辑
在任务执行时,系统通过工厂字典根据文档的 parser_id 选择解析器模块,然后调用解析器的 chunk() 函数。该函数通常处理文档二进制数据、指定的页码范围、语言以及各种解析器配置。
chunk 函数的参数包括:
filename:上传文件的名称binary:原始文件字节(可选)from_page、to_page:要解析的页码范围lang:文档语言callback:用于进度报告parser_config:解析器特定的配置(例如,版面、片段大小)
有关详细的解析器实现和支持的文件格式(如 PDF、DOCX、Excel、Markdown、EPUB、PPT),请参见文档解析策略。
来源:rag/svr/task_executor.py:85-168, rag/app/naive.py:86-112, rag/app/manual.py:137-175
---
片段切分过程
片段构建工作流
片段构建函数(build_chunks())负责编排以下操作:
- 从存储(MinIO)加载文件二进制数据
- 从
FACTORY中选择并调用片段切分器 - 处理图像并将提取的 ID 存储回 MinIO
- 可选地调用基于大语言模型(LLM)的内容增强(关键词、问答对)
- 最终准备用于嵌入向量化和索引的片段
该函数通过信号量应用并发控制,并使用超时守卫确保健壮性。
flowchart TB
Start["build_chunks()"]
SelectChunker["选择片段切分器<br/>FACTORY[parser_id]"]
ExecuteChunk["调用 parser.chunk()<br/>使用 chunk_limiter"]
ProcessImages["处理图像<br/>image2id(),存储到 MinIO"]
EnhanceContent["内容增强<br/>keyword_extraction()、question_proposal()、gen_metadata()"]
Start --> SelectChunker
SelectChunker --> ExecuteChunk
ExecuteChunk --> ProcessImages
ProcessImages --> EnhanceContent
EnhanceContent --> ReturnChunks["返回片段列表"]
有关不同片段切分器的策略变体和用例,请参见片段切分方法。
片段数据结构
解析器生成的每个片段都包含必要的元数据,以支持语义搜索:
| 字段 | 类型 | 描述 |
|---|---|---|
id | 字符串 | 唯一片段 ID(内容哈希 + doc_id) |
doc_id | 字符串 | 父文档 ID |
kb_id | 字符串 | 知识库(数据集)ID |
content_with_weight | 字符串 | 包含标题权重的文本内容 |
content_ltks | 列表 | 分词后的内容 |
img_id | 字符串 | 图像引用(MinIO 对象 ID) |
page_num_int | 整数 | 源文档中的页码 |
这些片段被传递给嵌入向量化处理,并与元数据和向量一起存储在文档存储中。
来源:rag/svr/task_executor.py:294-342, rag/nlp/search.py:149-153
---
内容增强与嵌入向量
基于大语言模型(LLM)的增强
如果解析器配置中启用了该功能,基于大语言模型(LLM)的增强会为片段生成元数据丰富的内容。这包括:
- 关键词提取 — 检测片段中的重要关键词或短语。
- 问题建议 — 生成片段内容可能回答的潜在问题。
- 元数据提取 — 使用 JSON 模式提示提取结构化数据字段。
- 目录生成 — 总结文档大纲条目以方便导航。
这些调用使用 rag.prompts.generator 下的提示生成工具。
有关增强和嵌入向量集成的详细信息,请参见内容增强与嵌入向量。
来源:rag/svr/task_executor.py:343-452, rag/prompts/generator.py:44-49
嵌入向量过程
增强后,片段通过调用大语言模型(LLM)抽象层(LLMBundle.encode)编码为向量嵌入向量。这支持多个提供商和使用跟踪。
嵌入向量以批处理方式进行,可能会在内容前添加文档或片段标题以保持语义连贯性。
生成的向量在文档存储中建立索引以供检索。
来源:rag/svr/task_executor.py:575-626, api/db/services/llm_service.py:95-118
---
数据流管线执行
自定义管线处理
RAGFlow 通过智能体/画布系统支持用户定义的复杂文档处理。run_dataflow() 函数执行这些管线,可以加载文档、解析、片段切分、增强和嵌入向量化,由自定义工作流编排。
flowchart TB
LoadDSL["加载管线 DSL<br/>UserCanvas 组件"]
ExecutePipeline["执行管线<br/>await pipeline.run()"]
ProcessOutput["处理输出<br/>片段/JSON/Markdown"]
HasVectors{"存在现有向量?"}
EmbedChunks["嵌入新片段<br/>LLMBundle.encode()"]
InsertChunks["插入到文档存储"]
LoadDSL --> ExecutePipeline
ExecutePipeline --> ProcessOutput
ProcessOutput --> HasVectors
HasVectors -->|否| EmbedChunks
HasVectors -->|是| InsertChunks
EmbedChunks --> InsertChunks
这种灵活的管线机制允许用复杂的处理流程集成或替换默认解析器。
来源:rag/svr/task_executor.py:629-768, api/db/db_models.py:25
---
视觉与高级功能
视觉处理
RAGFlow 集成了高级视觉分析功能,包括 OCR、版面识别、表格检测、乱码文本检测和图像自动旋转。这些功能允许精确解析扫描的 PDF 和富含图像的文档。
视觉任务集成在解析器实现中,例如 rag.app.naive 和 rag.app.manual。
有关完整详细信息,请参见视觉处理:OCR 与版面识别。
来源:rag/app/naive.py:35-40, rag/app/manual.py:33-67
GraphRAG 与 RAPTOR
高级功能包括:
- RAPTOR:一种层次聚类机制(
RAPTOR_TREE_BUILDER),将片段组织成树状结构,以优化检索和摘要。 - GraphRAG:通过实体和关系提取来提取知识图谱,用于复杂信息表示。
这些功能由任务执行器中的专门任务类型(raptor、graphrag)调用。
有关更多信息,请参见高级功能:GraphRAG 与 RAPTOR。
来源:rag/svr/task_executor.py:43-44, rag/raptor.py:1-75
---
数据源连接器
RAGFlow 支持超过 30 种外部数据源连接器(Confluence、Jira、Google Drive、GitHub、Slack、Notion 等)。这些连接器同步外部内容并将文档输入处理管线,从而实现统一的索引和检索。
连接器系统与数据集和文档导入管线紧密集成。
有关完整文档,请参见数据源连接器。
来源:api/db/services/knowledgebase_service.py:105-117
---
进度跟踪与错误处理
进度更新
处理进度更新通过 set_progress() 函数在所有管线阶段报告,并记录在任务数据库中,以实现前端实时 UI 反馈。
典型的进度范围映射到各阶段:
0.0 - 0.6:OCR 和版面识别0.6 - 0.7:片段切分0.7 - 0.9:语义增强(关键词、问题、元数据)0.9 - 1.0:嵌入向量化和索引-1:检测到错误或取消
错误处理与取消
管线包含多项健壮性功能:
- 任务取消检查(
has_canceled(task_id))允许提前中止耗时操作。 - 任务在被放弃前有重试限制(最多 3 次)。
- 文档和任务状态通过
TaskStatus和StatusEnum跟踪。
这些功能确保即使在失败或用户中止请求的情况下,也能保持稳定和响应式的处理。
来源:rag/svr/task_executor.py:143-175, api/db/services/task_service.py:129-142, api/db/services/document_service.py:32
---
附加资源
- 有关详细的解析器工厂和文件格式,请参见文档解析策略。
- 要了解各种片段切分样式(如 naive、QA、laws、paper 等),请参见片段切分方法。
- 有关基于大语言模型(LLM)的片段丰富,请参见内容增强与嵌入向量。
- 有关图像和版面解析,请参见视觉处理:OCR 与版面识别。
- 有关聚类和关系提取,请参见高级功能:GraphRAG 与 RAPTOR。
- 有关外部集成管线,请参见数据源连接器。
本父页面提供了一个概念性地图,将底层代码制品与文档处理中涉及的自然语言系统组件连接起来。
---
总结图:从自然语言组件到代码实体
graph TD
Upload["文档上传 API<br/>POST /upload"]
TaskCreate["TaskService.get_task()"]
RedisQueue["Redis Streams 队列<br/>REDIS_CONN / SVR_CONSUMER_GROUP_NAME"]
TaskExecutor["任务执行器<br/>rag.svr.task_executor"]
Factory["解析器工厂(字典)<br/>FACTORY ParserType -> 模块"]
Chunker["Parser.chunk() 方法<br/>例如 rag.app.naive.chunk()"]
Enhancement["大语言模型(LLM)增强<br/>keyword_extraction()、question_proposal()、gen_metadata()"]
Embedding["大语言模型(LLM)嵌入向量<br/>LLMBundle.encode()"]
Indexing["文档存储索引<br/>rag.nlp.Dealer.search()"]
Storage["MinIO 与 MySQL 存储"]
Upload --> TaskCreate --> RedisQueue --> TaskExecutor --> Factory --> Chunker
Chunker --> Enhancement --> Embedding --> Indexing --> Storage
Chunker --> Storage
---
总结图:任务执行器与解析器工厂映射
graph LR
subgraph 任务执行器
TE["rag.svr.task_executor 模块"]
TECollect["collect()"]
TEBuild["build_chunks()"]
end
subgraph 解析器工厂
FACTORY["FACTORY = {ParserType : ParserModule}"]
naive["rag.app.naive"]
paper["rag.app.paper"]
manual["rag.app.manual"]
table["rag.app.table"]
presentation["rag.app.presentation"]
laws["rag.app.laws"]
qa["rag.app.qa"]
book["rag.app.book"]
resume["rag.app.resume"]
end
TE --> TECollect --> TEBuild --> FACTORY
FACTORY --> naive
FACTORY --> paper
FACTORY --> manual
FACTORY --> table
FACTORY --> presentation
FACTORY --> laws
FACTORY --> qa
FACTORY --> book
FACTORY --> resume
---
参考
rag/svr/task_executor.py:1-200— 任务执行器、解析器选择、片段构建逻辑。rag/app/naive.py:1-112— 朴素文本解析和片段切分实现。rag/app/manual.py:1-75— 手册解析实现。rag/nlp/search.py:30-170— 文档存储和嵌入向量搜索抽象。api/db/services/task_service.py:50-150— 任务数据库管理、重试、进度。api/db/services/document_service.py:40-150— 文档元数据和文件关联。rag/raptor.py:1-75— RAPTOR 层次聚类。api/db/services/knowledgebase_service.py:100-120— 数据集和源连接器概述。api/db/services/llm_service.py:80-140— 大语言模型(LLM)抽象和嵌入向量接口。
---
本文档结束了对文档处理管线的概述。请深入子页面了解每个核心阶段的详细设计和实现。