人工输入节点与暂停恢复机制(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/langgenius/dify/5.6-human-input-node-and-pause-resume-mechanism
翻译时间:2026-05-27T08:44:30.514Z
翻译模型:deepseek-chat
原文字符数:14689
项目:Dify (dify)
---
人工输入节点与暂停-恢复机制
相关源文件
以下文件被用作生成此 Wiki 页面的上下文:
api/controllers/console/app/agent.pyapi/controllers/console/app/ops_trace.pyapi/controllers/console/app/workflow_trigger.pyapi/controllers/console/datasets/rag_pipeline/datasource_content_preview.pyapi/core/app/apps/advanced_chat/app_generator.pyapi/core/app/apps/advanced_chat/app_runner.pyapi/core/app/apps/advanced_chat/generate_task_pipeline.pyapi/core/app/apps/agent_chat/app_generator.pyapi/core/app/apps/chat/app_generator.pyapi/core/app/apps/completion/app_generator.pyapi/core/app/apps/message_based_app_generator.pyapi/core/app/apps/pipeline/pipeline_runner.pyapi/core/app/apps/workflow/app_generator.pyapi/core/app/apps/workflow/app_runner.pyapi/core/app/apps/workflow/generate_task_pipeline.pyapi/core/app/apps/workflow_app_runner.pyapi/core/app/entities/app_invoke_entities.pyapi/core/app/entities/queue_entities.pyapi/core/app/entities/task_entities.pyapi/core/app/layers/pause_state_persist_layer.pyapi/core/app/layers/trigger_post_layer.pyapi/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.pyapi/core/app/task_pipeline/message_cycle_manager.pyapi/models/types.pyapi/tests/test_containers_integration_tests/controllers/console/app/test_app_apis.pyapi/tests/test_containers_integration_tests/controllers/console/app/test_chat_conversation_status_count_api.pyapi/tests/test_containers_integration_tests/controllers/console/helpers.pyapi/tests/test_containers_integration_tests/core/app/layers/test_pause_state_persist_layer.pyapi/tests/test_containers_integration_tests/models/test_types_enum_text.pyapi/tests/test_containers_integration_tests/services/test_dataset_service_update_dataset.pyapi/tests/test_containers_integration_tests/test_workflow_pause_integration.pyapi/tests/unit_tests/controllers/console/app/test_workflow_app_log_api.pyapi/tests/unit_tests/controllers/console/app/test_workflow_trigger_api.pyapi/tests/unit_tests/core/app/apps/chat/test_base_app_runner_multimodal.pyapi/tests/unit_tests/core/app/apps/test_advanced_chat_app_generator.pyapi/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.pyapi/tests/unit_tests/core/app/task_pipeline/test_easy_ui_based_generate_task_pipeline.pyapi/tests/unit_tests/core/app/task_pipeline/test_message_cycle_manager_optimization.pyapi/tests/unit_tests/services/test_workflow_service.pyweb/app/components/workflow/constants.tsweb/app/components/workflow/hooks/use-checklist.tsweb/app/components/workflow/hooks/use-nodes-available-var-list.tsweb/app/components/workflow/nodes/human-input/__tests__/human-input.spec.tsxweb/app/components/workflow/nodes/human-input/__tests__/panel.spec.tsxweb/app/components/workflow/nodes/human-input/components/__tests__/user-action.spec.tsxweb/app/components/workflow/nodes/human-input/components/delivery-method/email-configure-modal.tsxweb/app/components/workflow/nodes/human-input/components/delivery-method/method-item.tsxweb/app/components/workflow/nodes/human-input/components/user-action.tsxweb/app/components/workflow/nodes/human-input/panel.tsxweb/i18n/en-US/workflow.json
目的与范围
本文档描述了 Dify 工作流中的人工输入节点实现,以及底层用于暂停工作流执行并等待外部用户输入的暂停-恢复机制。人工输入节点允许工作流通过动态生成的表单从用户处收集结构化数据,并支持多种投递渠道和可靠的超时处理。
该机制依赖于 GraphEngine 在运行时产生 GraphRunPausedEvent 时暂停执行的能力,并在满足所需外部条件(表单提交)后恢复执行。此状态通过聊天应用和工作流应用中的专用任务管线进行管理。
---
架构总览
人工输入系统由多个组件协同工作,以实现工作流的暂停和恢复。
系统组件与代码实体
| 自然语言概念 | 代码实体空间 | 文件路径 |
|---|---|---|
| 节点类 | HumanInputNode | api/graphon/nodes/builtin_node_types.py:82-82 |
| 暂停原因 | HumanInputRequired | api/graphon/entities/pause_reason.py:78-78 |
| 工作流状态 | GraphRuntimeState | api/graphon/runtime.py:83-83 |
| 持久化层 | PauseStatePersistenceLayer | api/core/app/layers/pause_state_persist_layer.py:42-42 |
| 任务管线 | WorkflowAppGenerateTaskPipeline | api/core/app/apps/workflow/generate_task_pipeline.py:75-75 |
| 响应类型 | WorkflowPauseStreamResponse | api/core/app/entities/task_entities.py:252-252 |
自然语言到代码实体空间的桥接
graph TB
subgraph "自然语言空间"
UserAction["用户提交表单"]
Timeout["表单超时"]
end
subgraph "代码实体空间(系统命名)"
HumanInputNode["HumanInputNode (BuiltinNodeTypes)"]
PauseReason["HumanInputRequired (PauseReason)"]
PersistenceLayer["PauseStatePersistenceLayer (Class)"]
WorkflowRun["Workflow (Model)"]
RedisChannel["RedisChannel (CommandChannel)"]
end
UserAction -->|"触发"| RedisChannel
Timeout -->|"触发"| RedisChannel
HumanInputNode -->|"产生"| PauseReason
PauseReason -->|"通过持久化"| PersistenceLayer
PersistenceLayer -->|"更新状态"| WorkflowRun
来源:
api/core/app/layers/pause_state_persist_layer.py:42-42api/graphon/entities/pause_reason.py:78-78api/graphon/graph_engine/command_channels.py:41-41api/models/workflow.py:70-70api/graphon/nodes/builtin_node_types.py:82-82
---
HumanInputNode 实现
HumanInputNode 是一种专用节点类型,它与 GraphEngine 交互以请求暂停当前执行线程。
节点数据结构
节点配置包含表单定义(FormInputConfig)和输入字段。当执行到达该节点时,它会准备表单数据并产生一个暂停事件。
classDiagram
class HumanInputNode {
+_run() Generator
}
class HumanInputRequired {
+str node_id
+str node_type
+dict payload
}
class GraphRunPausedEvent {
+str node_id
+HumanInputRequired pause_reason
}
HumanInputNode ..> GraphRunPausedEvent : 产生
GraphRunPausedEvent --> HumanInputRequired
执行与暂停流程
- 暂停信号:节点产生一个
GraphRunPausedEventapi/graphon/graph_events.py:61-61,其中包含HumanInputRequired暂停原因api/graphon/entities/pause_reason.py:78-78。 - 引擎挂起:
GraphEngine捕获此事件,任务管线(例如WorkflowAppGenerateTaskPipeline)捕获QueueWorkflowPausedEventapi/core/app/apps/workflow/generate_task_pipeline.py:38-38。 - 状态持久化:
PauseStatePersistenceLayerapi/core/app/layers/pause_state_persist_layer.py:42-42负责保存当前状态。 - 工作流状态:
WorkflowExecutionStatus更新为PAUSEDapi/graphon/enums.py:79-79。
来源:
api/graphon/entities/pause_reason.py:78-78api/graphon/graph_events.py:61-61api/core/app/layers/pause_state_persist_layer.py:42-42api/graphon/runtime.py:83-83api/core/app/apps/workflow/generate_task_pipeline.py:153-171
---
暂停与恢复机制
暂停-恢复机制允许 GraphEngine 被序列化并在之后重建。该机制由 WorkflowAppRunner api/core/app/apps/workflow/app_runner.py:24-24 和 AdvancedChatAppRunner api/core/app/apps/advanced_chat/app_runner.py:54-54 使用。
工作流恢复流程
恢复操作通过使用 graph_runtime_state api/core/app/apps/advanced_chat/app_runner.py:91-91 重新调用应用运行器来触发。
sequenceDiagram
participant Runner as "AdvancedChatAppRunner"
participant Engine as "GraphEngine"
participant Channel as "RedisChannel"
Runner->>Runner: 检查 _resume_graph_runtime_state
Runner->>Engine: 使用 resume_state 初始化 Graph
Runner->>Channel: 监听命令
Channel-->>Engine: 注入 HumanInputFormFilledEvent
Engine->>Engine: 从暂停节点继续执行
命令通道
Dify 使用 RedisChannel api/graphon/graph_engine/command_channels.py:41-41 与运行中或已暂停的工作流进行通信。当人工输入表单被填写时,会通过此通道发送命令以通知引擎继续执行。此操作由 WorkflowBasedAppRunner api/core/app/apps/workflow_app_runner.py:91-91 处理。
来源:
api/core/app/apps/advanced_chat/app_runner.py:122-133api/graphon/graph_engine/command_channels.py:41-41api/core/app/apps/workflow_app_runner.py:100-103
---
状态持久化层
PauseStatePersistenceLayer api/core/app/layers/pause_state_persist_layer.py:42-42 负责将工作流的"快照"保存到数据库中。
持久化细节
- 配置:使用
PauseStateLayerConfig定义存储行为api/core/app/layers/pause_state_persist_layer.py:42-42。 - 状态快照:
GraphRuntimeStateapi/graphon/runtime.py:83-83被序列化。 - 变量池:所有当前变量都保存在
VariablePoolapi/graphon/runtime.py:83-83中。 - 任务管线:
AdvancedChatAppGenerateTaskPipelineapi/core/app/apps/advanced_chat/generate_task_pipeline.py:141-141和WorkflowAppGenerateTaskPipelineapi/core/app/apps/workflow/generate_task_pipeline.py:75-75都通过GraphRuntimeStateSupport混入类支持GraphRuntimeState。
来源:
api/core/app/layers/pause_state_persist_layer.py:42-42api/graphon/runtime.py:83-83api/core/app/apps/advanced_chat/generate_task_pipeline.py:141-141api/core/app/apps/workflow/generate_task_pipeline.py:11-11
---
表单投递与通知
当工作流因等待人工输入而暂停时,Dify 可以触发通知,以确保用户知晓需要其输入。
邮件投递
Dify 包含一个专门用于人工输入邮件投递的任务:dispatch_human_input_email_task api/core/app/apps/workflow_app_runner.py:86-86。该任务在 WorkflowBasedAppRunner 中被导入和使用。
表单过期
系统通过事件处理表单超时:
QueueHumanInputFormTimeoutEvent:在定义的超时时间到期时触发api/core/app/apps/workflow/generate_task_pipeline.py:21-21。NodeRunHumanInputFormTimeoutEvent:引擎级别的事件,指示特定节点已超时api/core/app/apps/workflow_app_runner.py:68-68。QueueHumanInputFormFilledEvent:在用户成功提交表单时触发api/core/app/apps/workflow/generate_task_pipeline.py:20-20。
来源:
api/core/app/apps/workflow/generate_task_pipeline.py:20-21api/core/app/apps/workflow_app_runner.py:68-68api/core/app/apps/workflow_app_runner.py:86-86api/core/app/entities/task_entities.py:91-91