agentic_huge_data_base / wiki
页面 Dify · 4.2 文档索引管线·DeepWiki 中文全文译文

4.2 · 文档索引管线(Document Indexing Pipeline)

应用编排与外部知识接入 · 本章是 Dify DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Dify 章节4.2 状态全文译文 模块系统架构、工作流与编排、文档对象与元数据、智能体运行时
源码线索
  • api/core/indexing_runner.py
  • api/core/rag/extractor/extract_processor.py
  • api/core/rag/extractor/helpers.py
  • api/core/rag/extractor/text_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_doc_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_eml_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_epub_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_markdown_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_msg_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_ppt_extractor.py
模块标签
  • 系统架构
  • 工作流与编排
  • 文档对象与元数据
  • 智能体运行时
  • 测试、发布与运维

中文译文

文档索引管线(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/langgenius/dify/4.2-document-indexing-pipeline
翻译时间:2026-05-27T08:44:31.584Z
翻译模型:deepseek-chat
原文字符数:15990
项目:Dify (dify)

---

文档索引管线

相关源文件

以下文件被用作生成此 Wiki 页面的上下文:

  • api/core/indexing_runner.py
  • api/core/rag/extractor/extract_processor.py
  • api/core/rag/extractor/helpers.py
  • api/core/rag/extractor/text_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_doc_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_eml_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_epub_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_markdown_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_msg_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_ppt_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_pptx_extractor.py
  • api/core/rag/extractor/unstructured/unstructured_xml_extractor.py
  • api/core/rag/index_processor/index_processor_base.py
  • api/core/rag/index_processor/processor/paragraph_index_processor.py
  • api/core/rag/index_processor/processor/parent_child_index_processor.py
  • api/core/rag/index_processor/processor/qa_index_processor.py
  • api/core/tools/utils/web_reader_tool.py
  • api/events/event_handlers/clean_when_dataset_deleted.py
  • api/events/event_handlers/clean_when_document_deleted.py
  • api/services/summary_index_service.py
  • api/tasks/add_document_to_index_task.py
  • api/tasks/batch_clean_document_task.py
  • api/tasks/batch_create_segment_to_index_task.py
  • api/tasks/clean_dataset_task.py
  • api/tasks/clean_document_task.py
  • api/tasks/clean_notion_document_task.py
  • api/tasks/create_segment_to_index_task.py
  • api/tasks/delete_account_task.py
  • api/tasks/delete_segment_from_index_task.py
  • api/tasks/disable_segments_from_index_task.py
  • api/tasks/document_indexing_sync_task.py
  • api/tasks/document_indexing_task.py
  • api/tasks/document_indexing_update_task.py
  • api/tasks/duplicate_document_indexing_task.py
  • api/tasks/enable_segments_to_index_task.py
  • api/tasks/retry_document_indexing_task.py
  • api/tasks/sync_website_document_indexing_task.py
  • api/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.py
  • api/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.py
  • api/tests/unit_tests/core/rag/indexing/processor/test_paragraph_index_processor.py
  • api/tests/unit_tests/core/rag/indexing/processor/test_parent_child_index_processor.py
  • api/tests/unit_tests/core/rag/indexing/processor/test_qa_index_processor.py
  • api/tests/unit_tests/core/tools/utils/test_web_reader_tool.py
  • api/tests/unit_tests/services/document_service_validation.py
  • api/tests/unit_tests/services/test_dataset_service_lock_not_owned.py
  • api/tests/unit_tests/services/test_summary_index_service.py
  • api/tests/unit_tests/services/test_vector_service.py
  • api/tests/unit_tests/tasks/test_clean_dataset_task.py
  • api/tests/unit_tests/tasks/test_clean_document_task.py
  • api/tests/unit_tests/tasks/test_delete_account_task.py
  • api/tests/unit_tests/tasks/test_document_indexing_sync_task.py
  • api/tests/unit_tests/tasks/test_document_indexing_update_task.py

目的与范围

本文档描述文档索引管线,该管线负责将原始文档处理为可搜索的知识库片段。管线遵循提取-转换-加载(ETL)模式,将来自不同来源(上传文件、Notion 页面、网站)的文档转换为存储在向量数据库和关键词索引中的索引片段。

此过程的主要入口点是 IndexingRunner 类,它负责协调文档在各个状态之间的转换,直到文档变为可搜索状态。

来源api/core/indexing_runner.py:50-54

---

管线总览

文档索引管线由 IndexingRunner 类执行,包含三个顺序阶段:

阶段目的关键操作
提取从数据源解析原始内容文件解析、Notion API 调用、网页抓取
转换清洗、拆分和结构化内容文本清洗、分段、嵌入向量准备
加载将处理后的内容存储到索引中向量数据库写入、关键词索引创建、元数据存储

管线通过 Celery 任务以同步或异步方式启动,任务被分发到 dataset 队列。

来源api/core/indexing_runner.py:94-119api/tasks/document_indexing_task.py:22-22

---

核心架构组件

IndexingRunner 类层次结构

下图将高层 ETL 概念与 Dify 后端中使用的具体代码实体联系起来。

graph TB
    IndexingRunner["IndexingRunner<br/>(api/core/indexing_runner.py)"]
    IndexProcessorFactory["IndexProcessorFactory<br/>(api/core/rag/index_processor/index_processor_factory.py)"]

    subgraph "索引处理器"
        ParagraphProcessor["ParagraphIndexProcessor<br/>(IndexStructureType.PARAGRAPH_INDEX)"]
        ParentChildProcessor["ParentChildIndexProcessor<br/>(IndexStructureType.PARENT_CHILD_INDEX)"]
        QAProcessor["QAIndexProcessor<br/>(IndexStructureType.QA_INDEX)"]
    end

    subgraph "提取器"
        ExtractProcessor["ExtractProcessor.extract()<br/>(api/core/rag/extractor/extract_processor.py)"]
        PdfExtractor["PdfExtractor"]
        NotionExtractor["NotionExtractor"]
        FirecrawlWebExtractor["FirecrawlWebExtractor"]
    end

    subgraph "存储后端"
        VectorDB["向量<br/>(api/core/rag/datasource/vdb/vector_factory.py)"]
        KeywordDB["关键词<br/>(api/core/rag/datasource/keyword/keyword_factory.py)"]
        DocumentSegment["DocumentSegment 模型<br/>(api/models/dataset.py)"]
    end

    IndexingRunner -->|"init_index_processor()"| IndexProcessorFactory
    IndexProcessorFactory --> ParagraphProcessor
    IndexProcessorFactory --> ParentChildProcessor
    IndexProcessorFactory --> QAProcessor

    ParagraphProcessor -->|"extract()"| ExtractProcessor
    ExtractProcessor --> PdfExtractor
    ExtractProcessor --> NotionExtractor
    ExtractProcessor --> FirecrawlWebExtractor

    ParagraphProcessor -->|"load()"| VectorDB
    ParagraphProcessor -->|"load()"| KeywordDB
    ParagraphProcessor -->|"_load_segments()"| DocumentSegment

来源api/core/indexing_runner.py:93-119api/core/rag/extractor/extract_processor.py:93-130api/core/rag/index_processor/index_processor_factory.py:9-26

---

ETL 管线阶段

提取阶段

提取阶段从数据源获取原始内容,并将其转换为 core.rag.models.document.Document 对象列表。

graph LR
    subgraph "数据源"
        UploadFile["UploadFile 模型<br/>(DatasourceType.FILE)"]
        NotionImport["NotionInfo<br/>(DatasourceType.NOTION)"]
        WebsiteInfo["WebsiteInfo<br/>(DatasourceType.WEBSITE)"]
    end

    subgraph "处理"
        ExtractSetting["ExtractSetting 实体<br/>(api/core/rag/extractor/entity/extract_setting.py)"]
        ExtractProcessor["ExtractProcessor.extract()"]
    end

    subgraph "输出"
        TextDocs["list[Document]<br/>(page_content + metadata)"]
    end

    UploadFile --> ExtractSetting
    NotionImport --> ExtractSetting
    WebsiteInfo --> ExtractSetting

    ExtractSetting --> ExtractProcessor
    ExtractProcessor --> TextDocs

提取阶段实现

  • IndexingRunner._extract 方法通过调用索引处理器的 extract 方法来编排此阶段。
  • ExtractProcessor.extract 根据文件扩展名或数据源类型选择合适的提取器。
  • 文件提取:支持 PDF、Word、Excel、CSV、Markdown 和 HTML。如果 ETL_TYPE 设置为 Unstructured,则对 .doc.ppt.eml 等特定格式使用 Unstructured.io 提取器。
  • 网页提取:使用 FirecrawlWebExtractorJinaReaderWebExtractorWaterCrawlWebExtractor 来获取和解析网站内容。

来源api/core/indexing_runner.py:95-95api/core/rag/extractor/extract_processor.py:93-130api/core/rag/extractor/extract_processor.py:111-135

---

转换阶段

转换阶段对文本进行清洗和分段。处理逻辑取决于 doc_form(例如,段落、父子或问答)。

转换步骤

  1. 清洗CleanProcessor.clean 移除不需要的字符并应用预处理规则(例如,移除多余空格)。
  2. 拆分BaseIndexProcessor._get_splitter 根据规则创建一个 TextSplitter
  3. - FixedRecursiveCharacterTextSplitter 用于 customhierarchical 规则。 - EnhanceRecursiveCharacterTextSplitter 用于 automatic 规则。

  4. 结构化
  5. - ParagraphIndexProcessor:标准的扁平化分段。它还使用 _get_content_files 提取 Markdown 图片。 - ParentChildIndexProcessor:创建大的父级片段(例如,整个文档或段落)和较小的子级块(ChildChunk 模型),以提高检索精度。 - QAIndexProcessor:使用 LLMGenerator 从文本片段生成问答对。

来源api/core/rag/index_processor/index_processor_base.py:114-151api/core/rag/index_processor/processor/paragraph_index_processor.py:74-121api/core/rag/index_processor/processor/parent_child_index_processor.py:57-130api/core/rag/index_processor/processor/qa_index_processor.py:55-121

---

加载阶段

加载阶段将片段持久化到数据库,并将其索引以便检索。

graph TB
    Documents["list[Document]<br/>(转换后的块)"]

    subgraph "持久化"
        LoadSegments["IndexingRunner._load_segments()"]
        DocSegment["DocumentSegment 模型"]
        ChildChunk["ChildChunk 模型"]
    end

    subgraph "索引"
        LoadIndex["IndexingRunner._load()"]
        VectorIndex["向量<br/>(嵌入向量 + 向量数据库)"]
        KeywordIndex["关键词<br/>(关键词索引)"]
    end

    Documents --> LoadSegments
    LoadSegments --> DocSegment
    DocSegment --> ChildChunk

    Documents --> LoadIndex
    LoadIndex --> VectorIndex
    LoadIndex --> KeywordIndex

实现细节

  • _load_segments:将元数据和内容保存到 DocumentSegment 表。对于父子索引,还会保存到 ChildChunk 表。
  • _load:根据 indexing_technique 处理实际的索引操作。
  • - 高质量:使用 ModelInstance 生成嵌入向量,并通过 Vector.create() 将其存储到向量数据库中。 - 经济模式:使用 Keyword.add_texts() 构建关键词索引。

  • 多模态支持:如果数据集是多模态的,则调用 Vector.create_multimodal() 来索引图片附件。

来源api/core/indexing_runner.py:111-119api/core/rag/index_processor/processor/paragraph_index_processor.py:123-144api/core/rag/index_processor/processor/parent_child_index_processor.py:132-155

---

文档状态生命周期

文档会经历 models.enums.IndexingStatus 中定义的多个状态。

状态描述
waiting文档已排队等待处理。
parsing正在从源中提取原始内容。
splitting正在清洗和分段文本。
indexing正在生成嵌入向量并保存到索引。
completed管线成功完成。
error管线失败;错误详情存储在 document.error 中。

来源api/core/indexing_runner.py:58-67api/models/enums.py:43-43

---

异步任务处理

大规模索引操作由 Celery 任务管理,以确保系统响应性。

  • 索引add_document_to_index_task 负责将已处理的片段添加到向量索引中。
  • 同步document_indexing_sync_task 检查更新(例如,Notion 中的更新),并在内容发生变化时重新索引。
  • 清理clean_dataset_taskclean_document_task 在删除数据集或文档时,负责从数据库和向量存储中删除片段。
  • 批量操作batch_create_segment_to_index_task 允许从 CSV 模板批量创建片段。

来源api/tasks/add_document_to_index_task.py:23-23api/tasks/document_indexing_sync_task.py:22-22api/tasks/clean_dataset_task.py:33-33api/tasks/batch_create_segment_to_index_task.py:30-30

---

索引预估

在开始完整的索引运行之前,用户可以请求预估 Token 和片段数量。

  • IndexingRunner.indexing_estimate() 模拟提取和转换阶段。
  • 它会计算片段总数和嵌入向量所需的 Token 总数。
  • 返回一个包含片段数和 Token 数的 IndexingEstimate 实体。

来源api/core/indexing_runner.py:264-275api/core/entities/knowledge_entities.py:16-16