agentic_huge_data_base / wiki
页面 Dify · 3.4 后台任务处理与 Celery·DeepWiki 中文全文译文

3.4 · 后台任务处理与 Celery(Background Task Processing with Celery)

应用编排与外部知识接入 · 本章是 Dify DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Dify 章节3.4 状态全文译文 模块工作流与编排、存储与持久化、系统架构、测试、发布与运维
源码线索
  • api/commands/plugin.py
  • api/commands/storage.py
  • api/commands/system.py
  • api/commands/vector.py
  • api/configs/middleware/cache/redis_config.py
  • api/core/helper/encrypter.py
  • api/events/document_index_event.py
  • api/events/event_handlers/create_document_index.py
  • api/events/event_handlers/create_installed_app_when_app_created.py
  • api/events/event_handlers/create_site_record_when_app_created.py
模块标签
  • 工作流与编排
  • 存储与持久化
  • 系统架构
  • 测试、发布与运维
  • 检索、召回与索引

中文译文

后台任务处理与 Celery(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/langgenius/dify/3.4-background-task-processing-with-celery
翻译时间:2026-05-27T08:44:30.399Z
翻译模型:deepseek-chat
原文字符数:14563
项目:Dify (dify)

---

使用 Celery 进行后台任务处理

相关源文件

以下文件被用作生成此维基页面的上下文:

  • api/commands/plugin.py
  • api/commands/storage.py
  • api/commands/system.py
  • api/commands/vector.py
  • api/configs/middleware/cache/redis_config.py
  • api/core/helper/encrypter.py
  • api/events/document_index_event.py
  • api/events/event_handlers/create_document_index.py
  • api/events/event_handlers/create_installed_app_when_app_created.py
  • api/events/event_handlers/create_site_record_when_app_created.py
  • api/events/event_handlers/update_app_dataset_join_when_app_model_config_updated.py
  • api/events/event_handlers/update_app_dataset_join_when_app_published_workflow_updated.py
  • api/extensions/ext_celery.py
  • api/extensions/ext_redis.py
  • api/fields/workflow_fields.py
  • api/migrations/versions/6e957a32015b_add_embedding_cache_created_at_index.py
  • api/models/enums.py
  • api/pyrefly-local-excludes.txt
  • api/schedule/clean_embedding_cache_task.py
  • api/schedule/clean_messages.py
  • api/schedule/clean_unused_datasets_task.py
  • api/schedule/clean_workflow_runs_task.py
  • api/schedule/queue_monitor_task.py
  • api/schedule/workflow_schedule_task.py
  • api/tasks/add_document_to_index_task.py
  • api/tasks/batch_clean_document_task.py
  • api/tasks/batch_create_segment_to_index_task.py
  • api/tasks/clean_notion_document_task.py
  • api/tasks/create_segment_to_index_task.py
  • api/tasks/delete_account_task.py
  • api/tasks/delete_segment_from_index_task.py
  • api/tasks/disable_segments_from_index_task.py
  • api/tasks/document_indexing_sync_task.py
  • api/tasks/document_indexing_task.py
  • api/tasks/document_indexing_update_task.py
  • api/tasks/duplicate_document_indexing_task.py
  • api/tasks/enable_segments_to_index_task.py
  • api/tasks/retry_document_indexing_task.py
  • api/tasks/sync_website_document_indexing_task.py
  • api/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.py
  • api/tests/test_containers_integration_tests/trigger/__init__.py
  • api/tests/test_containers_integration_tests/trigger/conftest.py
  • api/tests/test_containers_integration_tests/trigger/test_trigger_e2e.py
  • api/tests/unit_tests/commands/test_reset_encrypt_key_pair.py
  • api/tests/unit_tests/core/app/apps/pipeline/test_pipeline_generator.py
  • api/tests/unit_tests/core/app/apps/pipeline/test_pipeline_runner.py
  • api/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.py
  • api/tests/unit_tests/extensions/test_celery_ssl.py
  • api/tests/unit_tests/extensions/test_redis.py
  • api/tests/unit_tests/models/test_enums_creator_user_role.py
  • api/tests/unit_tests/tasks/test_clean_document_task.py
  • api/tests/unit_tests/tasks/test_delete_account_task.py
  • api/tests/unit_tests/tasks/test_document_indexing_sync_task.py
  • api/tests/unit_tests/tasks/test_document_indexing_update_task.py

目的与范围

本文档描述了 Dify 中的后台任务处理基础设施,该系统使用 Celery 进行异步任务执行。系统在 API 服务的请求-响应周期之外处理长时间运行的操作,例如数据集索引、工作流执行、数据清理和应用删除。

有关工作流执行引擎的信息,请参见 5.1。有关文档索引管线的详细信息,请参见 4.2

---

架构总览

服务拓扑

Dify 的后台处理基础设施由三个主要组件组成,它们共享同一个容器镜像(langgenius/dify-api),但以不同的模式运行。系统使用 Redis 作为中央消息代理来管理跨工作节点的任务分发。

graph TB
    subgraph "共享镜像:langgenius/dify-api"
        API["API 服务<br/>MODE=api<br/>Flask/Gunicorn"]
        Worker["工作节点服务<br/>MODE=worker<br/>Celery Worker"]
        Beat["定时调度服务<br/>MODE=beat<br/>Celery Beat Scheduler"]
    end

    subgraph "消息代理"
        Redis["Redis<br/>Celery Broker"]
    end

    subgraph "任务队列"
        DatasetQueue["'dataset' 队列"]
        WorkflowQueue["'workflow' 队列"]
        AppDeletionQueue["'app_deletion' 队列"]
        MailQueue["'mail' 队列"]
    end

    subgraph "数据存储"
        PostgreSQL[("PostgreSQL<br/>session_factory")]
        VectorDB[("向量数据库<br/>VectorService")]
        Storage["对象存储<br/>(storage)"]
    end

    API -->|"分发任务"| Redis
    Beat -->|"调度周期性任务"| Redis

    Redis --> DatasetQueue
    Redis --> WorkflowQueue
    Redis --> AppDeletionQueue
    Redis --> MailQueue

    Worker -->|"消费任务"| Redis
    Worker -->|"读写"| PostgreSQL
    Worker -->|"索引片段"| VectorDB
    Worker -->|"清理文件"| Storage

来源: api/extensions/ext_celery.py:105-110api/tasks/batch_create_segment_to_index_task.py:177-180api/tasks/document_indexing_task.py:33

基于模式的服务区分

同一个 Docker 镜像支持多种运行模式,通过 MODE 环境变量控制。这使得 Dify 可以独立扩展 API 处理和后台处理能力。

模式服务用途进程
apiAPI 服务器处理 HTTP 请求Gunicorn + Flask
workerCelery 工作节点执行后台任务Celery worker 进程
beatCelery Beat调度周期性任务Celery beat 调度器

---

队列拓扑

队列结构

Dify 中的 Celery 工作节点从多个专用队列消费任务。这种分离方式允许对不同任务类型进行优先级处理和资源分配。

队列任务类型关键操作
dataset文档处理文件提取、片段切分、嵌入向量生成、向量索引
workflow / generation工作流执行节点处理、大语言模型(LLM)调用、工具调用
app_deletion资源清理删除应用、对话、消息以及关联的存储文件
mail邮件发送密码重置、邀请、通知

来源: api/tasks/document_indexing_task.py:33api/tasks/batch_create_segment_to_index_task.py:29api/tasks/document_indexing_sync_task.py:21-22

---

工作节点配置

工作节点进程管理

Celery 应用在 api/extensions/ext_celery.py 中初始化。它使用自定义的 FlaskTask 类来确保每个后台任务都在 Flask 应用上下文中运行,从而提供对数据库会话和配置的访问。

# 用于 Flask 上下文的自定义 Task 类
class FlaskTask(Task):
    def __call__(self, *args: object, **kwargs: object) -> object:
        from core.logging.context import init_request_context

        with app.app_context():
            # 初始化此任务的日志上下文
            init_request_context()
            return self.run(*args, **kwargs)

来源: api/extensions/ext_celery.py:94-101

---

消息代理和后端配置

Redis 集成

Dify 使用 Redis 作为主要消息代理和结果后端。系统支持高级 Redis 配置,包括 SSL/TLS 和 Sentinel。

SSL/TLS 支持

如果启用了 BROKER_USE_SSL,系统会将证书要求字符串(例如 CERT_REQUIRED)映射到 Python 的 ssl 常量,并将其应用于消息代理和 Redis 结果后端。

来源: api/extensions/ext_celery.py:33-61api/extensions/ext_celery.py:129-136

Sentinel 支持

CELERY_USE_SENTINEL 为 true 时,系统会使用主节点名称和 Sentinel 凭证配置 broker_transport_options

来源: api/extensions/ext_celery.py:64-82

---

使用 Beat 进行任务调度

Celery Beat 管理周期性的维护任务。调度表在初始化期间根据 dify_config 中的环境功能开关动态定义。

周期性任务调度表
任务名称执行频率用途
clean_embedding_cache_taskCELERY_BEAT_SCHEDULER_TIME从缓存中移除旧的嵌入向量结果
clean_unused_datasets_taskCELERY_BEAT_SCHEDULER_TIME根据计划清理标记为删除的数据集
clean_messagesCELERY_BEAT_SCHEDULER_TIME清理旧消息
create_tidb_serverless_task每小时TiDB Serverless 实例管理
update_tidb_serverless_status_task每 10 分钟TiDB 状态监控

来源: api/extensions/ext_celery.py:155-185api/schedule/clean_unused_datasets_task.py:26-42

---

核心后台任务模式

1. 文档索引管线

索引管线是一个多阶段过程,由 IndexingRunner 管理。诸如 document_indexing_taskdocument_indexing_sync_task 之类的任务处理文档准备的异步生命周期。

graph TD
    Trigger["API 调用 / 同步触发器"] --> Queue["'dataset' 队列"]
    Queue --> Task["document_indexing_task<br/>(api/tasks/document_indexing_task.py)"]
    Task --> Status["更新状态为 IndexingStatus.PARSING"]
    Status --> Runner["IndexingRunner.run()<br/>(core/indexing_runner.py)"]
    Runner --> SummaryCheck{"启用摘要?"}
    SummaryCheck -->|"是"| SummaryTask["generate_summary_index_task<br/>(api/tasks/generate_summary_index_task.py)"]

来源: api/tasks/document_indexing_task.py:33-112api/tasks/document_indexing_sync_task.py:134-147api/models/enums.py:127-138

2. 片段批量处理

对于大批量导入,Dify 使用 batch_create_segment_to_index_task 来处理 CSV/Excel 文件处理、通过 ModelManager 生成嵌入向量以及通过 VectorService 进行向量存储。

来源: api/tasks/batch_create_segment_to_index_task.py:30-180

3. 数据集维护与清理

clean_unused_datasets_task 执行复杂的清理工作,根据数据集的 CloudPlan(例如 SANDBOX 与 PRO)和活动级别进行过滤。它使用 IndexProcessorFactory 从向量索引中移除数据。

sequenceDiagram
    participant Beat as "Celery Beat"
    participant Task as "clean_unused_datasets_task"
    participant DB as "PostgreSQL (db.session)"
    participant VDB as "向量数据库 (IndexProcessor)"

    Beat->>Task: "触发周期性清理"
    Task->>DB: "查询未使用的数据集 (Dataset/Document)"
    Task->>VDB: "IndexProcessorFactory.init_index_processor().clean()"
    Task->>DB: "更新 Document.enabled = False"

来源: api/schedule/clean_unused_datasets_task.py:80-148api/extensions/ext_celery.py:164-167

---

实现细节

数据库会话管理

任务使用 session_factory.create_session() 来确保长时间运行的后台进程中具有清晰的事务边界。这对于防止工作节点池中的连接泄漏至关重要。

来源: api/tasks/batch_create_segment_to_index_task.py:59api/tasks/document_indexing_task.py:59api/tasks/document_indexing_sync_task.py:34

多租户与隔离

TenantIsolatedTaskQueue 用于诸如 duplicate_document_indexing_task 之类的任务,以确保一个租户的重型索引操作不会阻塞其他租户。它根据 dify_config.TENANT_ISOLATED_TASK_CONCURRENCY 中定义的租户特定并发限制来拉取任务。

来源: api/tasks/duplicate_document_indexing_task.py:55-76api/tasks/document_indexing_task.py:15

Redis 键前缀

为了支持共享 Redis 实例,如果配置了 REDIS_KEY_PREFIX,Dify 会将全局键前缀应用于所有 Celery 传输制品。

来源: api/extensions/ext_celery.py:85-90api/configs/middleware/cache/redis_config.py:35-38