agentic_huge_data_base / wiki
页面 Dify · 5 工作流引擎与节点执行·DeepWiki 中文全文译文

5 · 工作流引擎与节点执行(Workflow Engine and Node Execution)

应用编排与外部知识接入 · 本章是 Dify DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Dify 章节5 状态全文译文 模块系统架构、测试、发布与运维、接口与服务契约、配置治理
源码线索
  • api/.importlinter
  • api/core/app/file_access/__init__.py
  • api/core/app/file_access/controller.py
  • api/core/app/file_access/scope.py
  • api/core/workflow/node_factory.py
  • api/core/workflow/node_runtime.py
  • api/core/workflow/workflow_entry.py
  • api/services/plugin/plugin_migration.py
  • api/services/plugin/plugin_service.py
  • api/services/rag_pipeline/rag_pipeline.py
模块标签
  • 系统架构
  • 测试、发布与运维
  • 接口与服务契约
  • 配置治理
  • 图谱与关系

中文译文

工作流引擎与节点执行(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/langgenius/dify/5-workflow-engine-and-node-execution
翻译时间:2026-05-27T08:44:33.818Z
翻译模型:deepseek-chat
原文字符数:17058
项目:Dify (dify)

---

工作流引擎与节点执行

相关源文件

以下文件为本 Wiki 页面的上下文来源:

  • api/.importlinter
  • api/core/app/file_access/__init__.py
  • api/core/app/file_access/controller.py
  • api/core/app/file_access/scope.py
  • api/core/workflow/node_factory.py
  • api/core/workflow/node_runtime.py
  • api/core/workflow/workflow_entry.py
  • api/services/plugin/plugin_migration.py
  • api/services/plugin/plugin_service.py
  • api/services/rag_pipeline/rag_pipeline.py
  • api/services/rag_pipeline/rag_pipeline_transform_service.py
  • api/services/workflow_draft_variable_service.py
  • api/services/workflow_service.py
  • api/tests/integration_tests/workflow/nodes/test_code.py
  • api/tests/integration_tests/workflow/nodes/test_http.py
  • api/tests/integration_tests/workflow/nodes/test_llm.py
  • api/tests/integration_tests/workflow/nodes/test_parameter_extractor.py
  • api/tests/integration_tests/workflow/nodes/test_template_transform.py
  • api/tests/integration_tests/workflow/nodes/test_tool.py
  • api/tests/unit_tests/controllers/console/app/workflow_draft_variables_test.py
  • api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py
  • api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node.py
  • api/tests/unit_tests/core/rag/datasource/test_retrieval_attachment_access.py
  • api/tests/unit_tests/core/workflow/graph_engine/test_mock_factory.py
  • api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py
  • api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py
  • api/tests/unit_tests/core/workflow/nodes/answer/test_answer.py
  • api/tests/unit_tests/core/workflow/nodes/code/__init__.py
  • api/tests/unit_tests/core/workflow/nodes/code/code_node_spec.py
  • api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py
  • api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py
  • api/tests/unit_tests/core/workflow/nodes/template_transform/__init__.py
  • api/tests/unit_tests/core/workflow/nodes/template_transform/template_transform_node_spec.py
  • api/tests/unit_tests/core/workflow/nodes/test_if_else.py
  • api/tests/unit_tests/core/workflow/nodes/test_list_operator.py
  • api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py
  • api/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_file_conversion.py
  • api/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_node.py
  • api/tests/unit_tests/core/workflow/test_node_factory.py
  • api/tests/unit_tests/core/workflow/test_node_runtime.py
  • api/tests/unit_tests/core/workflow/test_workflow_entry.py
  • api/tests/unit_tests/services/plugin/__init__.py
  • api/tests/unit_tests/services/plugin/conftest.py
  • api/tests/unit_tests/services/plugin/test_dependencies_analysis.py
  • api/tests/unit_tests/services/plugin/test_endpoint_service.py
  • api/tests/unit_tests/services/plugin/test_plugin_migration.py
  • api/tests/unit_tests/services/plugin/test_plugin_service.py
  • api/tests/unit_tests/services/rag_pipeline/test_rag_pipeline.py
  • api/tests/unit_tests/services/rag_pipeline/test_rag_pipeline_transform_service.py
  • api/tests/unit_tests/services/workflow/test_draft_var_loader_simple.py
  • api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py

目的与范围

本文档描述了 Dify 中的工作流执行引擎,该引擎通过基于节点的执行模型来编排基于图的应用程序逻辑。引擎处理定义为有向无环图(DAG)的工作流,其中节点代表操作(大语言模型调用、HTTP 请求、代码执行等),边定义了数据流。

本页面涵盖以下内容:

  • 工作流定义与生命周期管理
  • 节点类型及其执行语义
  • 变量管理与节点间的数据传递
  • 执行追踪与状态持久化
  • 节点工厂模式与依赖注入

有关底层执行逻辑的详细信息,请参见工作流定义与执行模型

---

架构总览

工作流执行架构
graph TB
    subgraph "执行入口"
        WorkflowEntry["WorkflowEntry<br/>(core/workflow/workflow_entry.py)"]
        WorkflowAppRunner["WorkflowAppRunner<br/>(core/app/apps/workflow/app_runner.py)"]
        RagPipelineService["RagPipelineService<br/>(services/rag_pipeline/rag_pipeline.py)"]
    end

    subgraph "工作流模型"
        Workflow["Workflow<br/>(models/workflow.py)"]
        WorkflowRun["WorkflowRun<br/>(models/workflow.py)"]
        NodeExecution["WorkflowNodeExecutionModel<br/>(models/workflow.py)"]
    end

    subgraph "图引擎 (graphon)"
        Graph["Graph<br/>(graphon/graph.py)"]
        GraphEngine["GraphEngine<br/>(graphon/graph_engine)"]
        GraphRuntimeState["GraphRuntimeState<br/>(graphon/runtime.py)"]
        VariablePool["VariablePool<br/>(graphon/runtime.py)"]
    end

    subgraph "节点系统"
        NodeFactory["DifyNodeFactory<br/>(core/workflow/node_factory.py)"]
        LLMNode["LLMNode<br/>(graphon/nodes/llm)"]
        HttpRequestNode["HttpRequestNode<br/>(graphon/nodes/http_request)"]
        CodeNode["CodeNode<br/>(graphon/nodes/code)"]
        HumanInputNode["HumanInputNode<br/>(graphon/nodes/human_input)"]
    end

    WorkflowAppRunner --> WorkflowEntry
    RagPipelineService --> WorkflowEntry
    WorkflowEntry --> Workflow
    WorkflowEntry --> NodeFactory

    NodeFactory --> Graph
    NodeFactory --> GraphRuntimeState

    Graph --> GraphEngine
    GraphEngine --> LLMNode
    GraphEngine --> HttpRequestNode
    GraphEngine --> CodeNode
    GraphEngine --> HumanInputNode

    GraphEngine --> NodeExecution
    GraphRuntimeState --> VariablePool

来源: api/core/workflow/workflow_entry.py:139-180, api/services/workflow_service.py:101-131, api/services/rag_pipeline/rag_pipeline.py:44-55, api/core/workflow/node_factory.py:15-18

工作流引擎采用分层架构:

  1. 入口层 - WorkflowEntry 初始化 GraphEngine 并管理用于子图的 _WorkflowChildEngineBuilder api/core/workflow/workflow_entry.py:48-82
  2. 模型层 - Workflow 存储图定义,WorkflowRun 追踪执行过程,WorkflowNodeExecutionModel 记录节点级别的结果 api/models/workflow.py:76-85, api/services/workflow_service.py:114-131
  3. 图引擎 - 编排逻辑位于 graphon 包中,通过 GraphEngineGraphRuntimeState 管理节点调度与状态 api/core/workflow/workflow_entry.py:33-41
  4. 节点层 - 具体的节点实现执行业务逻辑,通过 DifyNodeFactory 实例化,该工厂使用 resolve_workflow_node_class 解析类 api/core/workflow/node_factory.py:123-134

---

工作流定义与生命周期

工作流模型结构

Workflow 模型存储完整的图定义。关键字段包括版本控制系统,其中 VERSION_DRAFT 用于编辑器环境 api/services/workflow_service.py:134-161

图配置结构

图定义包含节点和边的数组。DifyNodeFactory 使用 GraphInitParams 在执行期间保存此配置 api/core/workflow/node_factory.py:72-93。节点通过 get_node_type_classes_mapping 映射到类,该函数会执行所有节点模块的副作用导入 api/core/workflow/node_factory.py:105-120

工作流生命周期状态

工作流存在草稿版本或已发布版本。WorkflowService 管理生命周期,包括检查工作流是否存在以及获取相应版本 api/services/workflow_service.py:133-172

---

节点类型与执行

节点工厂模式

DifyNodeFactory 负责节点实例化,注入执行所需的 GraphInitParamsGraphRuntimeState。它还提供 DifyPreparedLLMDifyToolNodeRuntime 以满足节点特定需求 api/core/workflow/node_factory.py:24-32

graph TB
    subgraph "DifyNodeFactory 依赖注入"
        DNF["DifyNodeFactory<br/>(core/workflow/node_factory.py)"]
        GIP["GraphInitParams<br/>(graphon/entities.py)"]
        GRS["GraphRuntimeState<br/>(graphon/runtime.py)"]

        DNF -->|注入| GIP
        DNF -->|注入| GRS

        DNF -->|创建| LLM["LLMNode<br/>(DifyPreparedLLM)"]
        DNF -->|创建| Tool["ToolNode<br/>(DifyToolNodeRuntime)"]
        DNF -->|创建| HTTP["HttpRequestNode<br/>(graphon_ssrf_proxy)"]
    end

来源: api/core/workflow/node_factory.py:24-54, api/core/workflow/workflow_entry.py:89-92

核心节点类型
节点类型用途关键依赖
大语言模型LLMNode模型推理ModelInstance, LLMFileSaver api/tests/integration_tests/workflow/nodes/test_llm.py:78-89
HTTP 请求HttpRequestNode外部 API 调用ssrf_proxy, HttpRequestNodeConfig api/tests/integration_tests/workflow/nodes/test_http.py:77-87
代码CodeNodePython/JS 执行CodeExecutor, CodeNodeLimits api/core/workflow/node_factory.py:14-17
知识库KnowledgeRetrievalNode检索增强生成(RAG)查询RagPipelineService api/services/rag_pipeline/rag_pipeline.py:96-104
人工输入HumanInputNode用户交互DifyHumanInputNodeRuntime api/core/workflow/node_factory.py:26-34

有关节点实例化的详细信息,请参见节点工厂与依赖注入

节点执行流程

节点在 GraphEngine 内执行。像 RAG 这样的专用管线使用 RagPipelineTransformService 将标准数据集转换为基于工作流的执行图 api/services/rag_pipeline/rag_pipeline_transform_service.py:31-81

  • 大语言模型节点实现: 使用 DifyPromptMessageSerializer 为不同的模型模式格式化消息 api/tests/integration_tests/workflow/nodes/test_llm.py:72-75。请参见大语言模型节点实现
  • HTTP 和代码节点: HttpRequestNode 使用 Executor 处理复杂的请求体数据,包括嵌套的对象变量 api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_executor.py:59-77。请参见HTTP 请求与代码执行节点
  • 知识库检索: KnowledgeRetrievalNode(通常是 rag-pipeline 工作流类型的一部分)管理检索设置和索引技术 api/services/rag_pipeline/rag_pipeline_transform_service.py:70-77。请参见知识库检索节点
  • 人工输入: 通过 DifyHumanInputNodeRuntime 处理,支持交付渠道配置 api/services/workflow_service.py:21-25。请参见人工输入节点与暂停-恢复机制

---

变量管理与数据流

VariablePool 架构

VariablePool 是执行数据的中央仓库。在工作流入口过程中,它会使用系统变量和用户输入进行初始化 api/services/rag_pipeline/rag_pipeline.py:90-93

变量解析与加载

VariableLoader 的实现(例如 DraftVarLoader)负责从数据库检索变量值。它支持多线程加载,用于存储在外部存储中的大型变量 api/services/workflow_draft_variable_service.py:80-163

graph LR
    subgraph "变量管理"
        VP["VariablePool<br/>(graphon/runtime.py)"]
        VL["VariableLoader<br/>(graphon/variable_loader.py)"]
        DVL["DraftVarLoader<br/>(services/workflow_draft_variable_service.py)"]
        Storage["外部存储<br/>(extensions/ext_storage.py)"]

        VL <|-- DVL
        DVL -->|查询| DB["PostgreSQL/MySQL"]
        DVL -->|获取大型变量| Storage
        DVL -->|填充| VP
        VP -->|被解析| Node["节点执行"]
    end

来源: api/services/workflow_draft_variable_service.py:110-163, api/core/workflow/variable_pool_initializer.py:25-38

---

执行追踪与状态

WorkflowRun 和 NodeExecution
  • WorkflowRun: 追踪工作流实例的整体执行过程 api/services/rag_pipeline/rag_pipeline.py:104
  • WorkflowNodeExecutionModel: 记录节点的具体执行情况,包括其状态、输入和输出 api/services/workflow_service.py:114-131

WorkflowService 提供 get_node_last_run 方法来检索历史执行数据,这对于工作流编辑器中的"从节点运行"功能至关重要 api/services/workflow_service.py:114-131

---

测试与模拟系统

Dify 结合使用集成测试和模拟工厂来验证工作流逻辑。MockNodeFactory(在子页面中引用)允许对复杂图进行表驱动测试,而无需实际的大语言模型或 HTTP 调用。

有关详细信息,请参见工作流测试与模拟系统

---

子系统总结

来源: api/core/workflow/workflow_entry.py:139-180, api/services/workflow_service.py:101-131, api/core/workflow/node_factory.py:105-134