agentic_huge_data_base / wiki
页面 Cognee · 3.1 数据入库 (cognee.add)·DeepWiki 中文全文译文

3.1 · 数据入库 (cognee.add)(Data Ingestion (cognee.add))

记忆管道与知识图谱构建 · 本章是 Cognee DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Cognee 章节3.1 状态全文译文 模块界面与交互、文档对象与元数据、存储与持久化、入库与解析
源码线索
  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/cognify.py
  • cognee/eval_framework/modal_eval_dashboard.py
  • cognee/infrastructure/databases/vector/embeddings/EmbeddingEngine.py
  • cognee/infrastructure/files/storage/LocalFileStorage.py
  • cognee/infrastructure/files/storage/S3FileStorage.py
  • cognee/infrastructure/files/storage/s3_config.py
  • cognee/infrastructure/files/utils/extract_text_from_file.py
  • cognee/infrastructure/files/utils/get_file_metadata.py
  • cognee/infrastructure/files/utils/guess_file_type.py
模块标签
  • 界面与交互
  • 文档对象与元数据
  • 存储与持久化
  • 入库与解析
  • 图谱与关系

中文译文

数据入库 (cognee.add)(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/topoteretes/cognee/3.1-data-ingestion-cognee-add
翻译时间:2026-05-27T08:45:30.705Z
翻译模型:deepseek-chat
原文字符数:18554
项目:Cognee (cognee)

---

数据入库(cognee.add)

相关源文件

以下文件被用作生成此 Wiki 页面的上下文:

  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/cognify.py
  • cognee/eval_framework/modal_eval_dashboard.py
  • cognee/infrastructure/databases/vector/embeddings/EmbeddingEngine.py
  • cognee/infrastructure/files/storage/LocalFileStorage.py
  • cognee/infrastructure/files/storage/S3FileStorage.py
  • cognee/infrastructure/files/storage/s3_config.py
  • cognee/infrastructure/files/utils/extract_text_from_file.py
  • cognee/infrastructure/files/utils/get_file_metadata.py
  • cognee/infrastructure/files/utils/guess_file_type.py
  • cognee/infrastructure/files/utils/is_text_content.py
  • cognee/infrastructure/files/utils/open_data_file.py
  • cognee/infrastructure/loaders/LoaderEngine.py
  • cognee/infrastructure/loaders/LoaderInterface.py
  • cognee/infrastructure/loaders/__init__.py
  • cognee/infrastructure/loaders/core/__init__.py
  • cognee/infrastructure/loaders/create_loader_engine.py
  • cognee/infrastructure/loaders/external/__init__.py
  • cognee/infrastructure/loaders/get_loader_engine.py
  • cognee/infrastructure/loaders/supported_loaders.py
  • cognee/modules/chunking/CsvChunker.py
  • cognee/modules/data/models/Data.py
  • cognee/modules/data/processing/document_types/CsvDocument.py
  • cognee/modules/data/processing/document_types/DltRowDocument.py
  • cognee/modules/data/processing/document_types/__init__.py
  • cognee/modules/graph/methods/delete_data_related_edges.py
  • cognee/modules/graph/methods/delete_data_related_nodes.py
  • cognee/modules/graph/methods/get_dataset_related_edges.py
  • cognee/modules/graph/methods/get_dataset_related_nodes.py
  • cognee/modules/ingestion/classify.py
  • cognee/modules/ingestion/data_types/BinaryData.py
  • cognee/modules/ingestion/data_types/IngestionData.py
  • cognee/modules/ingestion/data_types/S3BinaryData.py
  • cognee/modules/ingestion/data_types/TextData.py
  • cognee/modules/ingestion/data_types/__init__.py
  • cognee/modules/pipelines/operations/pipeline.py
  • cognee/modules/pipelines/operations/run_tasks.py
  • cognee/modules/pipelines/operations/run_tasks_data_item.py
  • cognee/tasks/ingestion/config.py
  • cognee/tasks/ingestion/data_item_to_text_file.py
  • cognee/tasks/ingestion/dlt_utils.py
  • cognee/tasks/ingestion/exceptions/__init__.py
  • cognee/tasks/ingestion/exceptions/exceptions.py
  • cognee/tasks/ingestion/extract_dlt_fk_edges.py
  • cognee/tasks/ingestion/get_dlt_destination.py
  • cognee/tasks/ingestion/ingest_data.py
  • cognee/tasks/ingestion/ingest_dlt_source.py
  • cognee/tasks/ingestion/resolve_data_directories.py
  • cognee/tasks/ingestion/resolve_dlt_sources.py
  • cognee/tasks/ingestion/save_data_item_to_storage.py
  • cognee/tests/test_advanced_pdf_loader.py
  • cognee/tests/test_s3_file_storage.py
  • examples/demos/dlt_ingestion_example.py
  • examples/demos/test_data/employees_ontology.owl
  • poetry.lock
  • pyproject.toml
  • uv.lock

目的与范围

本文档描述了 Cognee 中的数据入库子系统,主要通过 cognee.add() 函数实现。数据入库是 Cognee 工作流的第一个阶段,负责接收来自各种来源的原始数据,并为其后续的知识图谱处理做好准备。

范围:本页面涵盖入库机制、支持的数据格式、数据集管理、增量加载和批处理。关于将入库数据加工为知识图谱的信息,请参见知识图谱生成(cognee.cognify)(3.3)

概述

cognee.add() 函数(位于 cognee/api/v1/add/add.py:92-236)是数据入库的入口点。它将多种输入格式(包括本地文件、URL、二进制流和 DLT 源)归一化为一致的存储表示形式。它在关系数据库中创建 Data 记录,并将处理后的文本文件存储起来,供下游的 cognify() 处理使用。

关键操作

  1. 数据解析:通过 resolve_data_directoriessave_data_item_to_storage 解析文件路径、目录结构,并验证可访问性。
  2. 内容提取:使用 LoaderEngine 从各种文件格式中提取文本内容。
  3. 数据集存储:使用 Data 模型将处理后的内容和元数据存储在关系数据库中。
  4. 元数据追踪:记录文件哈希值、时间戳和用户权限,用于增量加载和变更检测。

图表:add() 执行流程及代码实体引用

graph TB
    Input["输入:data, dataset_name, user"]

    add_func["add()<br/>cognee/api/v1/add/add.py:92"]

    transform_loaders["转换 preferred_loaders<br/>List[Union[str, dict]] → dict[str, dict]<br/>add.py:179-186"]

    create_tasks["tasks = [<br/>  Task(resolve_data_directories),<br/>  Task(ingest_data)<br/>]<br/>add.py:188-198"]

    setup_call["await setup()<br/>cognee/modules/engine/operations/setup.py<br/>初始化数据库引擎"]

    resolve_auth["resolve_authorized_user_dataset()<br/>返回:(User, Dataset)<br/>resolve_authorized_user_dataset.py"]

    resolve_dlt["await resolve_dlt_sources()<br/>DLT 资源展开<br/>add.py:208-213"]

    reset_status["reset_dataset_pipeline_run_status()<br/>清除 add_pipeline、cognify_pipeline<br/>add.py:215-217"]

    run_pipeline["run_pipeline()<br/>cognee/modules/pipelines/operations/pipeline.py:33"]

    run_tasks["run_tasks()<br/>cognee/modules/pipelines/operations/run_tasks.py:55"]

    batch_loop["批处理循环<br/>data_per_batch = 20<br/>run_tasks.py:91-118"]

    run_tasks_data_item["run_tasks_data_item()<br/>run_tasks_data_item.py"]

    save_storage["save_data_item_to_storage()<br/>返回:original_file_path<br/>save_data_item_to_storage.py:27"]

    data_to_text["data_item_to_text_file()<br/>返回:(text_path, LoaderEngine)<br/>data_item_to_text_file.py:36"]

    classify_identify["ingestion.classify()<br/>ingestion.identify()<br/>返回:data_id UUID<br/>ingest_data.py:113-117"]

    sql_ops["SQLAlchemy 会话操作<br/>Data INSERT/UPDATE<br/>dataset.data.extend()<br/>ingest_data.py:177-193"]

    return_info["返回 PipelineRunCompleted<br/>add.py:233"]

    Input --> add_func
    add_func --> transform_loaders
    transform_loaders --> create_tasks
    create_tasks --> setup_call
    setup_call --> resolve_auth
    resolve_auth --> resolve_dlt
    resolve_dlt --> reset_status
    reset_status --> run_pipeline
    run_pipeline --> run_tasks
    run_tasks --> batch_loop
    batch_loop --> run_tasks_data_item
    run_tasks_data_item --> save_storage
    save_storage --> data_to_text
    data_to_text --> classify_identify
    classify_identify --> sql_ops
    sql_ops --> return_info

来源:cognee/api/v1/add/add.py:92-236, cognee/modules/pipelines/operations/pipeline.py:33-108, cognee/modules/pipelines/operations/run_tasks.py:55-184, cognee/tasks/ingestion/ingest_data.py:26-235, cognee/tasks/ingestion/save_data_item_to_storage.py:27-98

函数签名与参数

add() 函数定义于 cognee/api/v1/add/add.py:92-114

async def add(
    data: Union[
        BinaryIO,
        list[BinaryIO],
        str,
        list[str],
        DataItem,
        list[DataItem],
        Any,  # DltResource、SourceFactory 或其他 dlt 类型
    ],
    dataset_name: str = "main_dataset",
    user: User = None,
    node_set: Optional[List[str]] = None,
    vector_db_config: dict = None,
    graph_db_config: dict = None,
    dataset_id: Optional[UUID] = None,
    preferred_loaders: Optional[List[Union[str, dict[str, dict[str, Any]]]]] = None,
    incremental_loading: bool = True,
    data_per_batch: Optional[int] = 20,
    importance_weight: Optional[float] = 0.5,
    run_in_background: bool = False,
    **kwargs,
):
参数类型默认值描述
dataUnion[...]必填输入数据:文本字符串、文件路径、URL、BinaryIO 流、DataItem 包装器或 DLT 源。
dataset_namestr"main_dataset"用于组织数据的目标数据集名称。
userUserNone用于认证和数据访问的用户上下文。
node_setOptional[List[str]]None存储在 Data.node_set 中的访问控制标签。
dataset_idOptional[UUID]None用于替代 dataset_name 的特定数据集 UUID。
preferred_loadersOptional[List]None首选加载器名称或配置的列表。
incremental_loadingboolTrue如果为 True,则跳过已基于 pipeline_status 完成处理的数据项。
data_per_batchint20并行任务执行的批次大小。

来源:cognee/api/v1/add/add.py:92-171

数据类型解析与归一化

输入归一化决策树

save_data_item_to_storage() 函数(位于 cognee/tasks/ingestion/save_data_item_to_storage.py:27-98)实现了一个归一化决策树,用于处理各种输入格式:

graph TB
    InputData["data_item 参数<br/>(Union[BinaryIO, str, Any])"]

    CheckLlamaIndex{"'llama_index' in<br/>str(type(data_item))?<br/>第 28 行"}

    CheckDocling{"'docling' in<br/>str(type(data_item))?<br/>第 34 行"}

    CheckFileAttr{"hasattr(data_item, 'file')?<br/>FastAPI UploadFile<br/>第 41 行"}

    CheckString{"isinstance(data_item, str)?<br/>第 44 行"}

    GetFromLlama["get_data_from_llama_index(data_item)<br/>第 32 行"]

    ExportText["data_item.export_to_text()<br/>第 38 行"]

    SaveUploadFile["save_data_to_file(data_item.file)<br/>第 42 行"]

    ParseURLFunc["parsed_url = urlparse(data_item)<br/>第 45 行"]

    CheckS3Scheme{"parsed_url.scheme == 's3'?<br/>第 59 行"}

    CheckHTTPScheme{"parsed_url.scheme in ['http', 'https']?<br/>第 61 行"}

    CheckFileScheme{"parsed_url.scheme == 'file'?<br/>第 65 行"}

    CheckAbsolutePath{"data_item.startswith('/') or<br/>Windows 绝对路径?<br/>第 72-74 行"}

    ReturnS3AsIs["return data_item<br/>第 60 行"]

    FetchPageContent["fetch_page_content(data_item)<br/>第 62 行"]

    CheckLocalFileAllowed{"ACCEPT_LOCAL_FILE_PATH<br/>设置?<br/>第 66、76、83 行"}

    NormalizeToFileURL["Path(normalized_path).as_uri()<br/>第 79 行"]

    SaveAsText["save_data_to_file(data_item)<br/>文本内容<br/>第 90 行"]

    InputData --> CheckLlamaIndex
    CheckLlamaIndex -->|是| GetFromLlama
    CheckLlamaIndex -->|否| CheckDocling
    CheckDocling -->|是| ExportText
    CheckDocling -->|否| CheckFileAttr
    CheckFileAttr -->|是| SaveUploadFile
    CheckFileAttr -->|否| CheckString
    CheckString -->|是| ParseURLFunc
    ParseURLFunc --> CheckS3Scheme
    CheckS3Scheme -->|是| ReturnS3AsIs
    CheckHTTPScheme -->|是| FetchPageContent
    CheckFileScheme -->|是| CheckLocalFileAllowed
    CheckAbsolutePath -->|是| CheckLocalFileAllowed
    CheckLocalFileAllowed -->|真| NormalizeToFileURL
    CheckLocalFileAllowed -->|假| RaiseError["抛出 IngestionError"]
    CheckString -->|不是 URL/路径| SaveAsText

来源:cognee/tasks/ingestion/save_data_item_to_storage.py:27-98

支持的数据类型

文件格式与加载器

Cognee 使用 LoaderEngine 来管理文件加载器。加载器根据优先级和文件类型(MIME/扩展名)进行选择。

类别支持的格式示例加载器
文本.txt.md.csvtext_loadercsv_loader
文档.pdf.docx.pptxpypdf_loaderdocling_loader
多媒体.png.jpg.mp3.wavimage_loaderaudio_loader
代码.py.js.tstext_loader

data_item_to_text_file 函数(位于 cognee/tasks/ingestion/data_item_to_text_file.py:36-79)负责协调将归一化路径转换为 Cognee 存储中的文本文件,通过注册的加载器调用 loader.load_file() 实现。

来源:cognee/tasks/ingestion/data_item_to_text_file.py:36-79, cognee/api/v1/add/add.py:136-142

DLT(数据加载工具)集成

Cognee 通过 resolve_dlt_sources(位于 cognee/tasks/ingestion/resolve_dlt_sources.py)支持从 DLT 源进行入库。

  • 解析:DLT 源和资源在处理前会被展开为单个数据项 cognee/api/v1/add/add.py:208-213
  • 标识:对于 DLT 数据项,优先使用源提供的稳定 data_id,而非基于内容哈希的 ID cognee/tasks/ingestion/ingest_data.py:125-128

来源:cognee/api/v1/add/add.py:208-213, cognee/tasks/ingestion/ingest_data.py:125-128

入库管线架构

入库过程由 run_tasks(位于 cognee/modules/pipelines/operations/run_tasks.py:55-184)进行编排。

批处理与并发

Cognee 使用基于信号量的并发模型来处理大量数据项。

  • 批处理:数据按照 data_per_batch(默认 20)定义的批次大小进行处理。
  • 并发asyncio.Semaphore(data_per_batch) 限制并发执行 run_tasks_data_item 的数量,以防止资源耗尽 cognee/modules/pipelines/operations/run_tasks.py:97-118
  • 分布式模式:如果启用了 COGNEE_DISTRIBUTED,则通过 @override_run_tasks 装饰器将任务交给 run_tasks_distributed 处理 cognee/modules/pipelines/operations/run_tasks.py:55
增量加载

增量加载通过检查存储在 Data 模型中的 pipeline_status 进行管理 cognee/modules/data/models/Data.py:34

  1. 标识ingestion.identify(classified_data, user) 基于内容哈希和所有者 ID 生成 data_id cognee/tasks/ingestion/ingest_data.py:117
  2. 状态检查ingest_data 检查具有该 ID 的 Data 记录是否已存在于关系数据库中 cognee/tasks/ingestion/ingest_data.py:133-136
  3. 变更检测:如果记录存在,Cognee 会比较当前的 content_hash 与存储的哈希值 cognee/tasks/ingestion/ingest_data.py:150-151
  4. 更新:如果内容未更改,则更新元数据,但避免冗余处理 cognee/tasks/ingestion/ingest_data.py:153-165

来源:cognee/modules/pipelines/operations/run_tasks.py:97-118, cognee/tasks/ingestion/ingest_data.py:113-165, cognee/modules/data/models/Data.py:34

数据集管理

数据集是 Cognee 中的主要组织单元。

  • 解析resolve_authorized_user_dataset 确保用户对目标数据集具有"写入"权限 cognee/api/v1/add/add.py:199-206
  • 关系模式Data 模型 cognee/modules/data/models/Data.py:12-48 追踪文件位置(raw_data_locationoriginal_data_location)、MIME 类型、哈希值和 Token 数量。
  • 关联Data 通过 DatasetData 关联表与 Dataset 关联,支持多对多关系 cognee/modules/data/models/Data.py:42-48

图表:Data 与 Dataset 的关系关联

classDiagram
    class Dataset {
        +UUID id
        +String name
        +UUID owner_id
    }
    class Data {
        +UUID id
        +String name
        +String content_hash
        +String raw_data_location
        +JSON pipeline_status
        +Float importance_weight
    }
    class DatasetData {
        +UUID dataset_id
        +UUID data_id
    }
    Dataset "1" *-- "many" DatasetData
    Data "1" *-- "many" DatasetData

来源:cognee/modules/data/models/Data.py:12-48, cognee/tasks/ingestion/ingest_data.py:60-80