文档索引管线(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/langgenius/dify/4.2-document-indexing-pipeline
翻译时间:2026-05-27T08:44:31.584Z
翻译模型:deepseek-chat
原文字符数:15990
项目:Dify (dify)
---
文档索引管线
相关源文件
以下文件被用作生成此 Wiki 页面的上下文:
api/core/indexing_runner.pyapi/core/rag/extractor/extract_processor.pyapi/core/rag/extractor/helpers.pyapi/core/rag/extractor/text_extractor.pyapi/core/rag/extractor/unstructured/unstructured_doc_extractor.pyapi/core/rag/extractor/unstructured/unstructured_eml_extractor.pyapi/core/rag/extractor/unstructured/unstructured_epub_extractor.pyapi/core/rag/extractor/unstructured/unstructured_markdown_extractor.pyapi/core/rag/extractor/unstructured/unstructured_msg_extractor.pyapi/core/rag/extractor/unstructured/unstructured_ppt_extractor.pyapi/core/rag/extractor/unstructured/unstructured_pptx_extractor.pyapi/core/rag/extractor/unstructured/unstructured_xml_extractor.pyapi/core/rag/index_processor/index_processor_base.pyapi/core/rag/index_processor/processor/paragraph_index_processor.pyapi/core/rag/index_processor/processor/parent_child_index_processor.pyapi/core/rag/index_processor/processor/qa_index_processor.pyapi/core/tools/utils/web_reader_tool.pyapi/events/event_handlers/clean_when_dataset_deleted.pyapi/events/event_handlers/clean_when_document_deleted.pyapi/services/summary_index_service.pyapi/tasks/add_document_to_index_task.pyapi/tasks/batch_clean_document_task.pyapi/tasks/batch_create_segment_to_index_task.pyapi/tasks/clean_dataset_task.pyapi/tasks/clean_document_task.pyapi/tasks/clean_notion_document_task.pyapi/tasks/create_segment_to_index_task.pyapi/tasks/delete_account_task.pyapi/tasks/delete_segment_from_index_task.pyapi/tasks/disable_segments_from_index_task.pyapi/tasks/document_indexing_sync_task.pyapi/tasks/document_indexing_task.pyapi/tasks/document_indexing_update_task.pyapi/tasks/duplicate_document_indexing_task.pyapi/tasks/enable_segments_to_index_task.pyapi/tasks/retry_document_indexing_task.pyapi/tasks/sync_website_document_indexing_task.pyapi/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.pyapi/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.pyapi/tests/unit_tests/core/rag/indexing/processor/test_paragraph_index_processor.pyapi/tests/unit_tests/core/rag/indexing/processor/test_parent_child_index_processor.pyapi/tests/unit_tests/core/rag/indexing/processor/test_qa_index_processor.pyapi/tests/unit_tests/core/tools/utils/test_web_reader_tool.pyapi/tests/unit_tests/services/document_service_validation.pyapi/tests/unit_tests/services/test_dataset_service_lock_not_owned.pyapi/tests/unit_tests/services/test_summary_index_service.pyapi/tests/unit_tests/services/test_vector_service.pyapi/tests/unit_tests/tasks/test_clean_dataset_task.pyapi/tests/unit_tests/tasks/test_clean_document_task.pyapi/tests/unit_tests/tasks/test_delete_account_task.pyapi/tests/unit_tests/tasks/test_document_indexing_sync_task.pyapi/tests/unit_tests/tasks/test_document_indexing_update_task.py
目的与范围
本文档描述文档索引管线,该管线负责将原始文档处理为可搜索的知识库片段。管线遵循提取-转换-加载(ETL)模式,将来自不同来源(上传文件、Notion 页面、网站)的文档转换为存储在向量数据库和关键词索引中的索引片段。
此过程的主要入口点是 IndexingRunner 类,它负责协调文档在各个状态之间的转换,直到文档变为可搜索状态。
来源:api/core/indexing_runner.py:50-54
---
管线总览
文档索引管线由 IndexingRunner 类执行,包含三个顺序阶段:
| 阶段 | 目的 | 关键操作 |
|---|---|---|
| 提取 | 从数据源解析原始内容 | 文件解析、Notion API 调用、网页抓取 |
| 转换 | 清洗、拆分和结构化内容 | 文本清洗、分段、嵌入向量准备 |
| 加载 | 将处理后的内容存储到索引中 | 向量数据库写入、关键词索引创建、元数据存储 |
管线通过 Celery 任务以同步或异步方式启动,任务被分发到 dataset 队列。
来源:api/core/indexing_runner.py:94-119,api/tasks/document_indexing_task.py:22-22
---
核心架构组件
IndexingRunner 类层次结构
下图将高层 ETL 概念与 Dify 后端中使用的具体代码实体联系起来。
graph TB
IndexingRunner["IndexingRunner<br/>(api/core/indexing_runner.py)"]
IndexProcessorFactory["IndexProcessorFactory<br/>(api/core/rag/index_processor/index_processor_factory.py)"]
subgraph "索引处理器"
ParagraphProcessor["ParagraphIndexProcessor<br/>(IndexStructureType.PARAGRAPH_INDEX)"]
ParentChildProcessor["ParentChildIndexProcessor<br/>(IndexStructureType.PARENT_CHILD_INDEX)"]
QAProcessor["QAIndexProcessor<br/>(IndexStructureType.QA_INDEX)"]
end
subgraph "提取器"
ExtractProcessor["ExtractProcessor.extract()<br/>(api/core/rag/extractor/extract_processor.py)"]
PdfExtractor["PdfExtractor"]
NotionExtractor["NotionExtractor"]
FirecrawlWebExtractor["FirecrawlWebExtractor"]
end
subgraph "存储后端"
VectorDB["向量<br/>(api/core/rag/datasource/vdb/vector_factory.py)"]
KeywordDB["关键词<br/>(api/core/rag/datasource/keyword/keyword_factory.py)"]
DocumentSegment["DocumentSegment 模型<br/>(api/models/dataset.py)"]
end
IndexingRunner -->|"init_index_processor()"| IndexProcessorFactory
IndexProcessorFactory --> ParagraphProcessor
IndexProcessorFactory --> ParentChildProcessor
IndexProcessorFactory --> QAProcessor
ParagraphProcessor -->|"extract()"| ExtractProcessor
ExtractProcessor --> PdfExtractor
ExtractProcessor --> NotionExtractor
ExtractProcessor --> FirecrawlWebExtractor
ParagraphProcessor -->|"load()"| VectorDB
ParagraphProcessor -->|"load()"| KeywordDB
ParagraphProcessor -->|"_load_segments()"| DocumentSegment
来源:api/core/indexing_runner.py:93-119,api/core/rag/extractor/extract_processor.py:93-130,api/core/rag/index_processor/index_processor_factory.py:9-26
---
ETL 管线阶段
提取阶段
提取阶段从数据源获取原始内容,并将其转换为 core.rag.models.document.Document 对象列表。
graph LR
subgraph "数据源"
UploadFile["UploadFile 模型<br/>(DatasourceType.FILE)"]
NotionImport["NotionInfo<br/>(DatasourceType.NOTION)"]
WebsiteInfo["WebsiteInfo<br/>(DatasourceType.WEBSITE)"]
end
subgraph "处理"
ExtractSetting["ExtractSetting 实体<br/>(api/core/rag/extractor/entity/extract_setting.py)"]
ExtractProcessor["ExtractProcessor.extract()"]
end
subgraph "输出"
TextDocs["list[Document]<br/>(page_content + metadata)"]
end
UploadFile --> ExtractSetting
NotionImport --> ExtractSetting
WebsiteInfo --> ExtractSetting
ExtractSetting --> ExtractProcessor
ExtractProcessor --> TextDocs
提取阶段实现:
IndexingRunner._extract方法通过调用索引处理器的extract方法来编排此阶段。ExtractProcessor.extract根据文件扩展名或数据源类型选择合适的提取器。- 文件提取:支持 PDF、Word、Excel、CSV、Markdown 和 HTML。如果
ETL_TYPE设置为Unstructured,则对.doc、.ppt和.eml等特定格式使用 Unstructured.io 提取器。 - 网页提取:使用
FirecrawlWebExtractor、JinaReaderWebExtractor或WaterCrawlWebExtractor来获取和解析网站内容。
来源:api/core/indexing_runner.py:95-95,api/core/rag/extractor/extract_processor.py:93-130,api/core/rag/extractor/extract_processor.py:111-135
---
转换阶段
转换阶段对文本进行清洗和分段。处理逻辑取决于 doc_form(例如,段落、父子或问答)。
转换步骤:
- 清洗:
CleanProcessor.clean移除不需要的字符并应用预处理规则(例如,移除多余空格)。 - 拆分:
BaseIndexProcessor._get_splitter根据规则创建一个TextSplitter。 - 结构化:
- FixedRecursiveCharacterTextSplitter 用于 custom 或 hierarchical 规则。 - EnhanceRecursiveCharacterTextSplitter 用于 automatic 规则。
- ParagraphIndexProcessor:标准的扁平化分段。它还使用 _get_content_files 提取 Markdown 图片。 - ParentChildIndexProcessor:创建大的父级片段(例如,整个文档或段落)和较小的子级块(ChildChunk 模型),以提高检索精度。 - QAIndexProcessor:使用 LLMGenerator 从文本片段生成问答对。
来源:api/core/rag/index_processor/index_processor_base.py:114-151,api/core/rag/index_processor/processor/paragraph_index_processor.py:74-121,api/core/rag/index_processor/processor/parent_child_index_processor.py:57-130,api/core/rag/index_processor/processor/qa_index_processor.py:55-121
---
加载阶段
加载阶段将片段持久化到数据库,并将其索引以便检索。
graph TB
Documents["list[Document]<br/>(转换后的块)"]
subgraph "持久化"
LoadSegments["IndexingRunner._load_segments()"]
DocSegment["DocumentSegment 模型"]
ChildChunk["ChildChunk 模型"]
end
subgraph "索引"
LoadIndex["IndexingRunner._load()"]
VectorIndex["向量<br/>(嵌入向量 + 向量数据库)"]
KeywordIndex["关键词<br/>(关键词索引)"]
end
Documents --> LoadSegments
LoadSegments --> DocSegment
DocSegment --> ChildChunk
Documents --> LoadIndex
LoadIndex --> VectorIndex
LoadIndex --> KeywordIndex
实现细节:
_load_segments:将元数据和内容保存到DocumentSegment表。对于父子索引,还会保存到ChildChunk表。_load:根据indexing_technique处理实际的索引操作。- 多模态支持:如果数据集是多模态的,则调用
Vector.create_multimodal()来索引图片附件。
- 高质量:使用 ModelInstance 生成嵌入向量,并通过 Vector.create() 将其存储到向量数据库中。 - 经济模式:使用 Keyword.add_texts() 构建关键词索引。
来源:api/core/indexing_runner.py:111-119,api/core/rag/index_processor/processor/paragraph_index_processor.py:123-144,api/core/rag/index_processor/processor/parent_child_index_processor.py:132-155
---
文档状态生命周期
文档会经历 models.enums.IndexingStatus 中定义的多个状态。
| 状态 | 描述 |
|---|---|
waiting | 文档已排队等待处理。 |
parsing | 正在从源中提取原始内容。 |
splitting | 正在清洗和分段文本。 |
indexing | 正在生成嵌入向量并保存到索引。 |
completed | 管线成功完成。 |
error | 管线失败;错误详情存储在 document.error 中。 |
来源:api/core/indexing_runner.py:58-67,api/models/enums.py:43-43
---
异步任务处理
大规模索引操作由 Celery 任务管理,以确保系统响应性。
- 索引:
add_document_to_index_task负责将已处理的片段添加到向量索引中。 - 同步:
document_indexing_sync_task检查更新(例如,Notion 中的更新),并在内容发生变化时重新索引。 - 清理:
clean_dataset_task和clean_document_task在删除数据集或文档时,负责从数据库和向量存储中删除片段。 - 批量操作:
batch_create_segment_to_index_task允许从 CSV 模板批量创建片段。
来源:api/tasks/add_document_to_index_task.py:23-23,api/tasks/document_indexing_sync_task.py:22-22,api/tasks/clean_dataset_task.py:33-33,api/tasks/batch_create_segment_to_index_task.py:30-30
---
索引预估
在开始完整的索引运行之前,用户可以请求预估 Token 和片段数量。
IndexingRunner.indexing_estimate()模拟提取和转换阶段。- 它会计算片段总数和嵌入向量所需的 Token 总数。
- 返回一个包含片段数和 Token 数的
IndexingEstimate实体。
来源:api/core/indexing_runner.py:264-275,api/core/entities/knowledge_entities.py:16-16