agentic_huge_data_base / wiki
页面 RAGFlow · 6 文档处理管线·DeepWiki 中文全文译文

6 · 文档处理管线(Document Processing Pipeline)

复杂文档理解与引用检索 · 本章是 RAGFlow DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目RAGFlow 章节6 状态全文译文 模块入库与解析、系统架构、工作流与编排、界面与交互
源码线索
  • api/db/__init__.py
  • api/db/db_models.py
  • api/db/services/dialog_service.py
  • api/db/services/document_service.py
  • api/db/services/file_service.py
  • api/db/services/knowledgebase_service.py
  • api/db/services/llm_service.py
  • api/db/services/task_service.py
  • api/db/services/user_service.py
  • deepdoc/parser/excel_parser.py
模块标签
  • 入库与解析
  • 系统架构
  • 工作流与编排
  • 界面与交互
  • 模型调用与提供方适配

中文译文

文档处理管线(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/infiniflow/ragflow/6-document-processing-pipeline
翻译时间:2026-05-27T08:44:43.302Z
翻译模型:deepseek-chat
原文字符数:20397
项目:RAGFlow (ragflow)

---

文档处理管线

相关源文件

以下文件被用作生成此 Wiki 页面的上下文:

  • api/db/__init__.py
  • api/db/db_models.py
  • api/db/services/dialog_service.py
  • api/db/services/document_service.py
  • api/db/services/file_service.py
  • api/db/services/knowledgebase_service.py
  • api/db/services/llm_service.py
  • api/db/services/task_service.py
  • api/db/services/user_service.py
  • deepdoc/parser/excel_parser.py
  • rag/app/book.py
  • rag/app/laws.py
  • rag/app/manual.py
  • rag/app/naive.py
  • rag/app/one.py
  • rag/app/paper.py
  • rag/app/presentation.py
  • rag/app/qa.py
  • rag/app/table.py
  • rag/nlp/__init__.py
  • rag/nlp/search.py
  • rag/raptor.py
  • rag/svr/task_executor.py

目的与范围

文档处理管线是 RAGFlow 的核心引擎,负责将上传的原始文档转换为语义丰富、经过片段切分、嵌入向量化并建立索引的单元,以供检索使用。其流程涵盖从文档上传和入库,到解析、片段切分、语义增强、嵌入向量化,最终在文档存储中进行索引的全过程。

本文档是对端到端管线结构和流程的高层概述,并链接到每个核心阶段的详细子页面:解析、片段切分、增强、视觉处理(OCR/版面分析)、高级功能(GraphRAG、RAPTOR)以及外部连接器集成。

有关详细的机制和实现,请参见以下链接的子页面:

---

管线架构总览

高层管线流程

文档处理管线以异步方式运行,使用任务队列和后台工作者。当通过 API 上传文档时,会创建一个处理任务并将其加入 Redis Streams 队列。后台任务执行器工作者从队列中获取任务,选择合适的解析器,对文档进行片段切分,通过大语言模型(LLM)应用语义增强,将片段转换为嵌入向量,并在文档存储中建立索引。

flowchart TB
    subgraph "上传与任务创建"
        Upload["文档上传<br/>POST /upload"]
        TaskCreate["任务创建<br/>TaskService.get_task"]
        RedisQueue["Redis Streams<br/>SVR 消费者组队列"]

        Upload --> TaskCreate
        TaskCreate --> RedisQueue
    end

    subgraph "任务执行器"
        Collect["任务收集<br/>task_executor.collect()"]
        BuildChunks["片段构建<br/>task_executor.build_chunks()"]

        RedisQueue --> Collect
        Collect --> BuildChunks
    end

    subgraph "解析器选择"
        Factory["FACTORY 字典<br/>ParserType → 解析器模块"]
        ParserExec["解析器执行<br/>chunk() 函数"]

        BuildChunks --> Factory
        Factory --> ParserExec
    end

    subgraph "增强"
        Keywords["关键词提取<br/>keyword_extraction()"]
        Questions["问题生成<br/>question_proposal()"]
        Metadata["元数据提取<br/>gen_metadata()"]
        Tags["内容标签<br/>content_tagging()"]

        ParserExec --> Keywords
        ParserExec --> Questions
        ParserExec --> Metadata
        ParserExec --> Tags
    end

    subgraph "嵌入向量与索引"
        Embed["文本嵌入向量<br/>LLMBundle.encode()"]
        Index["索引片段<br/>Dealer.search / DocStore"]

        Keywords --> Embed
        Questions --> Embed
        Metadata --> Embed
        Tags --> Embed
        Embed --> Index
    end

    subgraph "存储"
        DocStore[("文档存储<br/>Dealer 和 DocStoreConnection")]
        MinIO[("MinIO 对象存储")]
        MySQL[("元数据库<br/>MySQL/PostgreSQL")]

        Index --> DocStore
        ParserExec --> MinIO
        Index --> MySQL
    end

来源rag/svr/task_executor.py:85-110, rag/svr/task_executor.py:177-240, api/db/services/task_service.py:75-142, api/db/services/document_service.py:44-72, rag/nlp/search.py:37-51

---

任务执行器与队列系统

任务收集与分发

处理任务由从 task_executor.py 实例化的后台工作者管理。这些工作者从 Redis Streams 队列中消费文档处理任务,该队列使用由 SVR_CONSUMER_GROUP_NAMEtask_executor_* 消费者名称标识的消费者组。通过 asyncio 信号量控制并发度,限制最大并行任务数和片段构建数。

flowchart LR
    subgraph "Redis Streams"
        Queue["REDIS_CONN Streams<br/>get_unacked_iterator()"]
    end

    subgraph "消费者组"
        CG["SVR_CONSUMER_GROUP_NAME<br/>任务消费者组"]
    end

    subgraph "任务执行器"
        TE0["task_executor_0<br/>消费者实例"]
        TE1["task_executor_1<br/>消费者实例"]
    end

    subgraph "并发控制"
        TaskLimiter["task_limiter 信号量<br/>MAX_CONCURRENT_TASKS"]
        ChunkLimiter["chunk_limiter 信号量<br/>MAX_CONCURRENT_CHUNK_BUILDERS"]
        EmbedLimiter["embed_limiter 信号量<br/>MAX_CONCURRENT_CHUNK_BUILDERS"]
    end

    Queue --> CG
    CG --> TE0
    CG --> TE1

    TE0 --> TaskLimiter
    TE1 --> TaskLimiter

    TaskLimiter --> ChunkLimiter
    TaskLimiter --> EmbedLimiter

多个任务执行器并行运行,各自消费任务,通过信号量控制以避免过载。

任务类型与路由

管线支持多种任务类型,路由到不同的逻辑处理器,定义在 TASK_TYPE_TO_PIPELINE_TASK_TYPE 中:

任务类型管线任务类型处理逻辑描述
dataflowPARSE标准解析流程(run_dataflow()通过解析器模块进行典型文档解析
raptorRAPTORRAPTOR 层次聚类用于检索的层次化片段聚类
graphragGRAPH_RAGGraphRAG 知识图谱提取实体-关系图谱提取
mindmapMINDMAP思维导图生成(MindMapExtractor)提取思维导图结构
memoryMEMORY智能体记忆持久化(handle_save_to_memory_task持久化智能体记忆管理

这种路由支持高级文档处理工作流的可扩展性。

来源rag/svr/task_executor.py:104-110, rag/svr/task_executor.py:33-46, rag/raptor.py:1-50

---

解析器选择与工厂模式

解析器工厂映射

解析器模块为各种文档类型实现了片段切分策略。FACTORY 字典将解析器类型常量(ParserType)映射到对应的解析器模块,从而在运行时动态选择解析逻辑。

graph TB
    subgraph "解析器工厂映射"
        FACTORY["FACTORY 字典:<br/>ParserType => 模块"]
    end

    subgraph "解析器模块"
        naive["rag.app.naive"]
        paper["rag.app.paper"]
        book["rag.app.book"]
        presentation["rag.app.presentation"]
        manual["rag.app.manual"]
        laws["rag.app.laws"]
        qa["rag.app.qa"]
        table["rag.app.table"]
        resume["rag.app.resume"]
        picture["rag.app.picture"]
        one["rag.app.one"]
        audio["rag.app.audio"]
        email["rag.app.email"]
        tag["rag.app.tag"]
    end

    FACTORY --> naive
    FACTORY --> paper
    FACTORY --> book
    FACTORY --> presentation
    FACTORY --> manual
    FACTORY --> laws
    FACTORY --> qa
    FACTORY --> table
    FACTORY --> resume
    FACTORY --> picture
    FACTORY --> one
    FACTORY --> audio
    FACTORY --> email
    FACTORY --> tag

这种工厂模式支持独立扩展或定制解析策略。

解析器选择逻辑

在任务执行时,系统通过工厂字典根据文档的 parser_id 选择解析器模块,然后调用解析器的 chunk() 函数。该函数通常处理文档二进制数据、指定的页码范围、语言以及各种解析器配置。

chunk 函数的参数包括:

  • filename:上传文件的名称
  • binary:原始文件字节(可选)
  • from_pageto_page:要解析的页码范围
  • lang:文档语言
  • callback:用于进度报告
  • parser_config:解析器特定的配置(例如,版面、片段大小)

有关详细的解析器实现和支持的文件格式(如 PDF、DOCX、Excel、Markdown、EPUB、PPT),请参见文档解析策略

来源rag/svr/task_executor.py:85-168, rag/app/naive.py:86-112, rag/app/manual.py:137-175

---

片段切分过程

片段构建工作流

片段构建函数(build_chunks())负责编排以下操作:

  • 从存储(MinIO)加载文件二进制数据
  • FACTORY 中选择并调用片段切分器
  • 处理图像并将提取的 ID 存储回 MinIO
  • 可选地调用基于大语言模型(LLM)的内容增强(关键词、问答对)
  • 最终准备用于嵌入向量化和索引的片段

该函数通过信号量应用并发控制,并使用超时守卫确保健壮性。

flowchart TB
    Start["build_chunks()"]
    SelectChunker["选择片段切分器<br/>FACTORY[parser_id]"]
    ExecuteChunk["调用 parser.chunk()<br/>使用 chunk_limiter"]
    ProcessImages["处理图像<br/>image2id(),存储到 MinIO"]
    EnhanceContent["内容增强<br/>keyword_extraction()、question_proposal()、gen_metadata()"]

    Start --> SelectChunker
    SelectChunker --> ExecuteChunk
    ExecuteChunk --> ProcessImages
    ProcessImages --> EnhanceContent

    EnhanceContent --> ReturnChunks["返回片段列表"]

有关不同片段切分器的策略变体和用例,请参见片段切分方法

片段数据结构

解析器生成的每个片段都包含必要的元数据,以支持语义搜索:

字段类型描述
id字符串唯一片段 ID(内容哈希 + doc_id)
doc_id字符串父文档 ID
kb_id字符串知识库(数据集)ID
content_with_weight字符串包含标题权重的文本内容
content_ltks列表分词后的内容
img_id字符串图像引用(MinIO 对象 ID)
page_num_int整数源文档中的页码

这些片段被传递给嵌入向量化处理,并与元数据和向量一起存储在文档存储中。

来源rag/svr/task_executor.py:294-342, rag/nlp/search.py:149-153

---

内容增强与嵌入向量

基于大语言模型(LLM)的增强

如果解析器配置中启用了该功能,基于大语言模型(LLM)的增强会为片段生成元数据丰富的内容。这包括:

  • 关键词提取 — 检测片段中的重要关键词或短语。
  • 问题建议 — 生成片段内容可能回答的潜在问题。
  • 元数据提取 — 使用 JSON 模式提示提取结构化数据字段。
  • 目录生成 — 总结文档大纲条目以方便导航。

这些调用使用 rag.prompts.generator 下的提示生成工具。

有关增强和嵌入向量集成的详细信息,请参见内容增强与嵌入向量

来源rag/svr/task_executor.py:343-452, rag/prompts/generator.py:44-49

嵌入向量过程

增强后,片段通过调用大语言模型(LLM)抽象层(LLMBundle.encode)编码为向量嵌入向量。这支持多个提供商和使用跟踪。

嵌入向量以批处理方式进行,可能会在内容前添加文档或片段标题以保持语义连贯性。

生成的向量在文档存储中建立索引以供检索。

来源rag/svr/task_executor.py:575-626, api/db/services/llm_service.py:95-118

---

数据流管线执行

自定义管线处理

RAGFlow 通过智能体/画布系统支持用户定义的复杂文档处理。run_dataflow() 函数执行这些管线,可以加载文档、解析、片段切分、增强和嵌入向量化,由自定义工作流编排。

flowchart TB
    LoadDSL["加载管线 DSL<br/>UserCanvas 组件"]
    ExecutePipeline["执行管线<br/>await pipeline.run()"]
    ProcessOutput["处理输出<br/>片段/JSON/Markdown"]

    HasVectors{"存在现有向量?"}
    EmbedChunks["嵌入新片段<br/>LLMBundle.encode()"]
    InsertChunks["插入到文档存储"]

    LoadDSL --> ExecutePipeline
    ExecutePipeline --> ProcessOutput
    ProcessOutput --> HasVectors

    HasVectors -->|否| EmbedChunks
    HasVectors -->|是| InsertChunks
    EmbedChunks --> InsertChunks

这种灵活的管线机制允许用复杂的处理流程集成或替换默认解析器。

来源rag/svr/task_executor.py:629-768, api/db/db_models.py:25

---

视觉与高级功能

视觉处理

RAGFlow 集成了高级视觉分析功能,包括 OCR、版面识别、表格检测、乱码文本检测和图像自动旋转。这些功能允许精确解析扫描的 PDF 和富含图像的文档。

视觉任务集成在解析器实现中,例如 rag.app.naiverag.app.manual

有关完整详细信息,请参见视觉处理:OCR 与版面识别

来源rag/app/naive.py:35-40, rag/app/manual.py:33-67

GraphRAG 与 RAPTOR

高级功能包括:

  • RAPTOR:一种层次聚类机制(RAPTOR_TREE_BUILDER),将片段组织成树状结构,以优化检索和摘要。
  • GraphRAG:通过实体和关系提取来提取知识图谱,用于复杂信息表示。

这些功能由任务执行器中的专门任务类型(raptorgraphrag)调用。

有关更多信息,请参见高级功能:GraphRAG 与 RAPTOR

来源rag/svr/task_executor.py:43-44, rag/raptor.py:1-75

---

数据源连接器

RAGFlow 支持超过 30 种外部数据源连接器(Confluence、Jira、Google Drive、GitHub、Slack、Notion 等)。这些连接器同步外部内容并将文档输入处理管线,从而实现统一的索引和检索。

连接器系统与数据集和文档导入管线紧密集成。

有关完整文档,请参见数据源连接器

来源api/db/services/knowledgebase_service.py:105-117

---

进度跟踪与错误处理

进度更新

处理进度更新通过 set_progress() 函数在所有管线阶段报告,并记录在任务数据库中,以实现前端实时 UI 反馈。

典型的进度范围映射到各阶段:

  • 0.0 - 0.6:OCR 和版面识别
  • 0.6 - 0.7:片段切分
  • 0.7 - 0.9:语义增强(关键词、问题、元数据)
  • 0.9 - 1.0:嵌入向量化和索引
  • -1:检测到错误或取消
错误处理与取消

管线包含多项健壮性功能:

  • 任务取消检查(has_canceled(task_id))允许提前中止耗时操作。
  • 任务在被放弃前有重试限制(最多 3 次)。
  • 文档和任务状态通过 TaskStatusStatusEnum 跟踪。

这些功能确保即使在失败或用户中止请求的情况下,也能保持稳定和响应式的处理。

来源rag/svr/task_executor.py:143-175, api/db/services/task_service.py:129-142, api/db/services/document_service.py:32

---

附加资源

本父页面提供了一个概念性地图,将底层代码制品与文档处理中涉及的自然语言系统组件连接起来。

---

总结图:从自然语言组件到代码实体

graph TD
    Upload["文档上传 API<br/>POST /upload"]
    TaskCreate["TaskService.get_task()"]
    RedisQueue["Redis Streams 队列<br/>REDIS_CONN / SVR_CONSUMER_GROUP_NAME"]

    TaskExecutor["任务执行器<br/>rag.svr.task_executor"]
    Factory["解析器工厂(字典)<br/>FACTORY ParserType -> 模块"]
    Chunker["Parser.chunk() 方法<br/>例如 rag.app.naive.chunk()"]

    Enhancement["大语言模型(LLM)增强<br/>keyword_extraction()、question_proposal()、gen_metadata()"]
    Embedding["大语言模型(LLM)嵌入向量<br/>LLMBundle.encode()"]
    Indexing["文档存储索引<br/>rag.nlp.Dealer.search()"]
    Storage["MinIO 与 MySQL 存储"]

    Upload --> TaskCreate --> RedisQueue --> TaskExecutor --> Factory --> Chunker
    Chunker --> Enhancement --> Embedding --> Indexing --> Storage
    Chunker --> Storage

---

总结图:任务执行器与解析器工厂映射

graph LR
    subgraph 任务执行器
        TE["rag.svr.task_executor 模块"]
        TECollect["collect()"]
        TEBuild["build_chunks()"]
    end

    subgraph 解析器工厂
        FACTORY["FACTORY = {ParserType : ParserModule}"]
        naive["rag.app.naive"]
        paper["rag.app.paper"]
        manual["rag.app.manual"]
        table["rag.app.table"]
        presentation["rag.app.presentation"]
        laws["rag.app.laws"]
        qa["rag.app.qa"]
        book["rag.app.book"]
        resume["rag.app.resume"]
    end

    TE --> TECollect --> TEBuild --> FACTORY
    FACTORY --> naive
    FACTORY --> paper
    FACTORY --> manual
    FACTORY --> table
    FACTORY --> presentation
    FACTORY --> laws
    FACTORY --> qa
    FACTORY --> book
    FACTORY --> resume

---

参考

  • rag/svr/task_executor.py:1-200 — 任务执行器、解析器选择、片段构建逻辑。
  • rag/app/naive.py:1-112 — 朴素文本解析和片段切分实现。
  • rag/app/manual.py:1-75 — 手册解析实现。
  • rag/nlp/search.py:30-170 — 文档存储和嵌入向量搜索抽象。
  • api/db/services/task_service.py:50-150 — 任务数据库管理、重试、进度。
  • api/db/services/document_service.py:40-150 — 文档元数据和文件关联。
  • rag/raptor.py:1-75 — RAPTOR 层次聚类。
  • api/db/services/knowledgebase_service.py:100-120 — 数据集和源连接器概述。
  • api/db/services/llm_service.py:80-140 — 大语言模型(LLM)抽象和嵌入向量接口。

---

本文档结束了对文档处理管线的概述。请深入子页面了解每个核心阶段的详细设计和实现。