agentic_huge_data_base / wiki
页面 Dify · 5.6 人工输入节点与暂停恢复机制·DeepWiki 中文全文译文

5.6 · 人工输入节点与暂停恢复机制(Human Input Node and Pause-Resume Mechanism)

应用编排与外部知识接入 · 本章是 Dify DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Dify 章节5.6 状态全文译文 模块工作流与编排、系统架构、界面与交互、评测、反馈与人工复核
源码线索
  • api/controllers/console/app/agent.py
  • api/controllers/console/app/ops_trace.py
  • api/controllers/console/app/workflow_trigger.py
  • api/controllers/console/datasets/rag_pipeline/datasource_content_preview.py
  • api/core/app/apps/advanced_chat/app_generator.py
  • api/core/app/apps/advanced_chat/app_runner.py
  • api/core/app/apps/advanced_chat/generate_task_pipeline.py
  • api/core/app/apps/agent_chat/app_generator.py
  • api/core/app/apps/chat/app_generator.py
  • api/core/app/apps/completion/app_generator.py
模块标签
  • 工作流与编排
  • 系统架构
  • 界面与交互
  • 评测、反馈与人工复核
  • 图谱与关系

中文译文

人工输入节点与暂停恢复机制(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/langgenius/dify/5.6-human-input-node-and-pause-resume-mechanism
翻译时间:2026-05-27T08:44:30.514Z
翻译模型:deepseek-chat
原文字符数:14689
项目:Dify (dify)

---

人工输入节点与暂停-恢复机制

相关源文件

以下文件被用作生成此 Wiki 页面的上下文:

  • api/controllers/console/app/agent.py
  • api/controllers/console/app/ops_trace.py
  • api/controllers/console/app/workflow_trigger.py
  • api/controllers/console/datasets/rag_pipeline/datasource_content_preview.py
  • api/core/app/apps/advanced_chat/app_generator.py
  • api/core/app/apps/advanced_chat/app_runner.py
  • api/core/app/apps/advanced_chat/generate_task_pipeline.py
  • api/core/app/apps/agent_chat/app_generator.py
  • api/core/app/apps/chat/app_generator.py
  • api/core/app/apps/completion/app_generator.py
  • api/core/app/apps/message_based_app_generator.py
  • api/core/app/apps/pipeline/pipeline_runner.py
  • api/core/app/apps/workflow/app_generator.py
  • api/core/app/apps/workflow/app_runner.py
  • api/core/app/apps/workflow/generate_task_pipeline.py
  • api/core/app/apps/workflow_app_runner.py
  • api/core/app/entities/app_invoke_entities.py
  • api/core/app/entities/queue_entities.py
  • api/core/app/entities/task_entities.py
  • api/core/app/layers/pause_state_persist_layer.py
  • api/core/app/layers/trigger_post_layer.py
  • api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py
  • api/core/app/task_pipeline/message_cycle_manager.py
  • api/models/types.py
  • api/tests/test_containers_integration_tests/controllers/console/app/test_app_apis.py
  • api/tests/test_containers_integration_tests/controllers/console/app/test_chat_conversation_status_count_api.py
  • api/tests/test_containers_integration_tests/controllers/console/helpers.py
  • api/tests/test_containers_integration_tests/core/app/layers/test_pause_state_persist_layer.py
  • api/tests/test_containers_integration_tests/models/test_types_enum_text.py
  • api/tests/test_containers_integration_tests/services/test_dataset_service_update_dataset.py
  • api/tests/test_containers_integration_tests/test_workflow_pause_integration.py
  • api/tests/unit_tests/controllers/console/app/test_workflow_app_log_api.py
  • api/tests/unit_tests/controllers/console/app/test_workflow_trigger_api.py
  • api/tests/unit_tests/core/app/apps/chat/test_base_app_runner_multimodal.py
  • api/tests/unit_tests/core/app/apps/test_advanced_chat_app_generator.py
  • api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py
  • api/tests/unit_tests/core/app/task_pipeline/test_easy_ui_based_generate_task_pipeline.py
  • api/tests/unit_tests/core/app/task_pipeline/test_message_cycle_manager_optimization.py
  • api/tests/unit_tests/services/test_workflow_service.py
  • web/app/components/workflow/constants.ts
  • web/app/components/workflow/hooks/use-checklist.ts
  • web/app/components/workflow/hooks/use-nodes-available-var-list.ts
  • web/app/components/workflow/nodes/human-input/__tests__/human-input.spec.tsx
  • web/app/components/workflow/nodes/human-input/__tests__/panel.spec.tsx
  • web/app/components/workflow/nodes/human-input/components/__tests__/user-action.spec.tsx
  • web/app/components/workflow/nodes/human-input/components/delivery-method/email-configure-modal.tsx
  • web/app/components/workflow/nodes/human-input/components/delivery-method/method-item.tsx
  • web/app/components/workflow/nodes/human-input/components/user-action.tsx
  • web/app/components/workflow/nodes/human-input/panel.tsx
  • web/i18n/en-US/workflow.json

目的与范围

本文档描述了 Dify 工作流中的人工输入节点实现,以及底层用于暂停工作流执行并等待外部用户输入的暂停-恢复机制。人工输入节点允许工作流通过动态生成的表单从用户处收集结构化数据,并支持多种投递渠道和可靠的超时处理。

该机制依赖于 GraphEngine 在运行时产生 GraphRunPausedEvent 时暂停执行的能力,并在满足所需外部条件(表单提交)后恢复执行。此状态通过聊天应用和工作流应用中的专用任务管线进行管理。

---

架构总览

人工输入系统由多个组件协同工作,以实现工作流的暂停和恢复。

系统组件与代码实体
自然语言概念代码实体空间文件路径
节点类HumanInputNodeapi/graphon/nodes/builtin_node_types.py:82-82
暂停原因HumanInputRequiredapi/graphon/entities/pause_reason.py:78-78
工作流状态GraphRuntimeStateapi/graphon/runtime.py:83-83
持久化层PauseStatePersistenceLayerapi/core/app/layers/pause_state_persist_layer.py:42-42
任务管线WorkflowAppGenerateTaskPipelineapi/core/app/apps/workflow/generate_task_pipeline.py:75-75
响应类型WorkflowPauseStreamResponseapi/core/app/entities/task_entities.py:252-252
自然语言到代码实体空间的桥接
graph TB
    subgraph "自然语言空间"
        UserAction["用户提交表单"]
        Timeout["表单超时"]
    end

    subgraph "代码实体空间(系统命名)"
        HumanInputNode["HumanInputNode (BuiltinNodeTypes)"]
        PauseReason["HumanInputRequired (PauseReason)"]
        PersistenceLayer["PauseStatePersistenceLayer (Class)"]
        WorkflowRun["Workflow (Model)"]
        RedisChannel["RedisChannel (CommandChannel)"]
    end

    UserAction -->|"触发"| RedisChannel
    Timeout -->|"触发"| RedisChannel
    HumanInputNode -->|"产生"| PauseReason
    PauseReason -->|"通过持久化"| PersistenceLayer
    PersistenceLayer -->|"更新状态"| WorkflowRun

来源:

  • api/core/app/layers/pause_state_persist_layer.py:42-42
  • api/graphon/entities/pause_reason.py:78-78
  • api/graphon/graph_engine/command_channels.py:41-41
  • api/models/workflow.py:70-70
  • api/graphon/nodes/builtin_node_types.py:82-82

---

HumanInputNode 实现

HumanInputNode 是一种专用节点类型,它与 GraphEngine 交互以请求暂停当前执行线程。

节点数据结构

节点配置包含表单定义(FormInputConfig)和输入字段。当执行到达该节点时,它会准备表单数据并产生一个暂停事件。

classDiagram
    class HumanInputNode {
        +_run() Generator
    }
    class HumanInputRequired {
        +str node_id
        +str node_type
        +dict payload
    }
    class GraphRunPausedEvent {
        +str node_id
        +HumanInputRequired pause_reason
    }
    HumanInputNode ..> GraphRunPausedEvent : 产生
    GraphRunPausedEvent --> HumanInputRequired
执行与暂停流程
  1. 暂停信号:节点产生一个 GraphRunPausedEvent api/graphon/graph_events.py:61-61,其中包含 HumanInputRequired 暂停原因 api/graphon/entities/pause_reason.py:78-78
  2. 引擎挂起GraphEngine 捕获此事件,任务管线(例如 WorkflowAppGenerateTaskPipeline)捕获 QueueWorkflowPausedEvent api/core/app/apps/workflow/generate_task_pipeline.py:38-38
  3. 状态持久化PauseStatePersistenceLayer api/core/app/layers/pause_state_persist_layer.py:42-42 负责保存当前状态。
  4. 工作流状态WorkflowExecutionStatus 更新为 PAUSED api/graphon/enums.py:79-79

来源:

  • api/graphon/entities/pause_reason.py:78-78
  • api/graphon/graph_events.py:61-61
  • api/core/app/layers/pause_state_persist_layer.py:42-42
  • api/graphon/runtime.py:83-83
  • api/core/app/apps/workflow/generate_task_pipeline.py:153-171

---

暂停与恢复机制

暂停-恢复机制允许 GraphEngine 被序列化并在之后重建。该机制由 WorkflowAppRunner api/core/app/apps/workflow/app_runner.py:24-24AdvancedChatAppRunner api/core/app/apps/advanced_chat/app_runner.py:54-54 使用。

工作流恢复流程

恢复操作通过使用 graph_runtime_state api/core/app/apps/advanced_chat/app_runner.py:91-91 重新调用应用运行器来触发。

sequenceDiagram
    participant Runner as "AdvancedChatAppRunner"
    participant Engine as "GraphEngine"
    participant Channel as "RedisChannel"

    Runner->>Runner: 检查 _resume_graph_runtime_state
    Runner->>Engine: 使用 resume_state 初始化 Graph
    Runner->>Channel: 监听命令
    Channel-->>Engine: 注入 HumanInputFormFilledEvent
    Engine->>Engine: 从暂停节点继续执行
命令通道

Dify 使用 RedisChannel api/graphon/graph_engine/command_channels.py:41-41 与运行中或已暂停的工作流进行通信。当人工输入表单被填写时,会通过此通道发送命令以通知引擎继续执行。此操作由 WorkflowBasedAppRunner api/core/app/apps/workflow_app_runner.py:91-91 处理。

来源:

  • api/core/app/apps/advanced_chat/app_runner.py:122-133
  • api/graphon/graph_engine/command_channels.py:41-41
  • api/core/app/apps/workflow_app_runner.py:100-103

---

状态持久化层

PauseStatePersistenceLayer api/core/app/layers/pause_state_persist_layer.py:42-42 负责将工作流的"快照"保存到数据库中。

持久化细节
  • 配置:使用 PauseStateLayerConfig 定义存储行为 api/core/app/layers/pause_state_persist_layer.py:42-42
  • 状态快照GraphRuntimeState api/graphon/runtime.py:83-83 被序列化。
  • 变量池:所有当前变量都保存在 VariablePool api/graphon/runtime.py:83-83 中。
  • 任务管线AdvancedChatAppGenerateTaskPipeline api/core/app/apps/advanced_chat/generate_task_pipeline.py:141-141WorkflowAppGenerateTaskPipeline api/core/app/apps/workflow/generate_task_pipeline.py:75-75 都通过 GraphRuntimeStateSupport 混入类支持 GraphRuntimeState

来源:

  • api/core/app/layers/pause_state_persist_layer.py:42-42
  • api/graphon/runtime.py:83-83
  • api/core/app/apps/advanced_chat/generate_task_pipeline.py:141-141
  • api/core/app/apps/workflow/generate_task_pipeline.py:11-11

---

表单投递与通知

当工作流因等待人工输入而暂停时,Dify 可以触发通知,以确保用户知晓需要其输入。

邮件投递

Dify 包含一个专门用于人工输入邮件投递的任务:dispatch_human_input_email_task api/core/app/apps/workflow_app_runner.py:86-86。该任务在 WorkflowBasedAppRunner 中被导入和使用。

表单过期

系统通过事件处理表单超时:

  • QueueHumanInputFormTimeoutEvent:在定义的超时时间到期时触发 api/core/app/apps/workflow/generate_task_pipeline.py:21-21
  • NodeRunHumanInputFormTimeoutEvent:引擎级别的事件,指示特定节点已超时 api/core/app/apps/workflow_app_runner.py:68-68
  • QueueHumanInputFormFilledEvent:在用户成功提交表单时触发 api/core/app/apps/workflow/generate_task_pipeline.py:20-20

来源:

  • api/core/app/apps/workflow/generate_task_pipeline.py:20-21
  • api/core/app/apps/workflow_app_runner.py:68-68
  • api/core/app/apps/workflow_app_runner.py:86-86
  • api/core/app/entities/task_entities.py:91-91