WebSocket 事件处理(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/open-webui/open-webui/4.5-websocket-event-handling
翻译时间:2026-06-09T16:08:11.844Z
翻译模型:deepseek-chat
原文字符数:10685
项目:Open WebUI (open-webui)
---
WebSocket 事件处理
相关源文件
以下文件为本 wiki 页面的生成上下文:
backend/open_webui/socket/main.pybackend/open_webui/socket/utils.pybackend/open_webui/tasks.pybackend/open_webui/test/util/test_redis.pybackend/open_webui/utils/rate_limit.pybackend/open_webui/utils/redis.pysrc/lib/apis/index.tssrc/lib/components/chat/Chat.sveltesrc/lib/components/chat/MessageInput.sveltesrc/lib/components/chat/Messages.sveltesrc/lib/components/chat/Messages/Message.sveltesrc/lib/components/chat/Messages/MultiResponseMessages.sveltesrc/lib/components/chat/Messages/ResponseMessage.sveltesrc/lib/components/chat/Messages/UserMessage.sveltesrc/lib/stores/index.tssrc/lib/utils/index.tssrc/routes/+layout.svelte
目的与范围
本文档介绍 Open WebUI 中的实时通信系统,该系统使用 Socket.IO 为聊天响应、状态变更和协作编辑提供实时更新。WebSocket 事件系统支持 AI 响应流式传输、实时通知以及跨浏览器标签页和服务器实例的多用户文档协作。
关于更广泛的实时通信架构和 Redis 同步,请参见 2.5 实时通信架构。关于协作文档编辑的详细信息,请参见 9.3 协作编辑。
---
架构概览
WebSocket 事件系统由三个主要层次组成:
- Socket.IO 服务器 - 后端事件路由器,可选配 Redis pub/sub 以支持分布式部署
backend/open_webui/socket/main.py:65-96。 - 事件处理器 - 前端消费者,处理传入事件并更新 UI
src/lib/components/chat/Chat.svelte:179-247。 - 分布式状态 - 基于 Redis 的存储,用于会话管理和使用量跟踪
backend/open_webui/socket/main.py:116-142。
WebSocket 组件交互
graph TB
subgraph "前端组件"
ChatSvelte["Chat.svelte [src/lib/components/chat/Chat.svelte]"]
MessageInput["MessageInput.svelte [src/lib/components/chat/MessageInput.svelte]"]
ResponseMsg["ResponseMessage.svelte [src/lib/components/chat/Messages/ResponseMessage.svelte]"]
end
subgraph "Socket.IO 层 [backend/open_webui/socket/main.py]"
SocketServer["sio = socketio.AsyncServer"]
EventsHandler["@sio.on('events')"]
UserJoin["@sio.on('user-join')"]
Usage["@sio.on('usage')"]
end
subgraph "Redis 状态(可选)"
SessionPool["SESSION_POOL [RedisDict]"]
UsagePool["USAGE_POOL [RedisDict]"]
YdocMgr["YDOC_MANAGER [YdocManager]"]
RedisPubSub["mgr = socketio.AsyncRedisManager"]
end
subgraph "后端服务"
ChatAPI["聊天 API"]
TaskMgr["tasks.py [backend/open_webui/tasks.py]"]
end
ChatSvelte -->|"$socket?.on('events')"| SocketServer
SocketServer --> EventsHandler
SocketServer --> UserJoin
SocketServer --> Usage
EventsHandler -->|存储会话| SessionPool
Usage -->|跟踪使用量| UsagePool
SocketServer <-->|分布式事件| RedisPubSub
ChatAPI -->|"sio.emit('events')"| SocketServer
TaskMgr -->|"sio.emit('events')"| SocketServer
来源: backend/open_webui/socket/main.py:63-170,src/lib/components/chat/Chat.svelte:30-31,src/routes/+layout.svelte:115-207。
---
事件消息结构
所有事件都遵循标准化的负载结构,通过 events 通道发送。前端 chatEventHandler 根据 chat_id 和 message_id 进行路由。
| 字段 | 类型 | 描述 |
|---|---|---|
chat_id | string | 用于路由的聊天标识符。 |
message_id | string | 目标消息 ID。 |
data.type | string | 事件类型标识符(例如 chat:message:delta)。 |
data.data | object | 类型特定的负载。 |
来源: src/lib/components/chat/Chat.svelte:179-247,backend/open_webui/socket/main.py:650-674。
---
事件类型
聊天流式事件
| 事件类型 | 用途 | 数据字段 |
|---|---|---|
status | 在生成过程中显示状态更新 | done,action,description,urls,query |
chat:completion | 表示生成完成 | done,content,title |
chat:message:delta | 流式传输消息内容块 | content |
chat:message | 替换整个消息内容 | content |
chat:message:error | 报告生成错误 | error 对象 |
来源: src/lib/components/chat/Messages/ResponseMessage.svelte:67-116,src/lib/components/chat/Chat.svelte:179-247。
引用和代码执行事件
引用和代码执行结果作为 history.messages 结构中的特殊消息更新进行处理。
graph LR
Source["'source' 或 'code_execution' 事件"]
TypeCheck{"data.type === 'code_execution'?"}
CodeArray["message.code_executions[]"]
SourceArray["message.sources[]"]
Source --> TypeCheck
TypeCheck -->|是| CodeArray
TypeCheck -->|否| SourceArray
来源: src/lib/components/chat/Messages/ResponseMessage.svelte:89-101,src/lib/components/chat/Chat.svelte:166-169。
---
前端事件处理
聊天组件处理器
主要的事件消费者是 Chat.svelte 中定义的监听器。它响应式地管理本地 history 存储。
sequenceDiagram
participant Backend as 后端 [socket/main.py]
participant Socket as $socket 存储
participant Handler as chatEventHandler [Chat.svelte]
participant History as history.messages
Backend->>Socket: emit('events', payload)
Socket->>Handler: 触发监听器
Handler->>Handler: 验证 chat_id
alt 事件类型: chat:message:delta
Handler->>History: 将内容追加到 message_id
else 事件类型: status
Handler->>History: 更新 statusHistory 数组
else 事件类型: chat:completion
Handler->>Handler: chatCompletionEventHandler()
end
关键实现细节:
- 响应性: 对
history.messages对象的更新会触发Messages.svelte和ResponseMessage.svelte的重新渲染src/lib/components/chat/Messages/ResponseMessage.svelte:124-136。 - 节流: 在
Messages.svelte中,handleHistoryChange函数使用requestAnimationFrame在活跃流式传输期间将消息列表重建节流为每动画帧一次,以保持性能src/lib/components/chat/Messages.svelte:107-130。 - 结构更新: 当
history.currentId发生变化时(例如导航到新聊天),列表会立即重建,不进行节流src/lib/components/chat/Messages.svelte:116-120。
来源: src/lib/components/chat/Chat.svelte:179-247,src/lib/components/chat/Messages.svelte:107-132,src/lib/components/chat/Messages.svelte:85-103。
---
后端事件发送
Socket.IO 服务器配置
服务器同时支持 WebSocket 和轮询传输。在分布式环境中,它使用 AsyncRedisManager 跨多个节点广播事件 backend/open_webui/socket/main.py:65-72。
# backend/open_webui/socket/main.py:65-84
if WEBSOCKET_MANAGER == 'redis':
mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL, ...)
sio = socketio.AsyncServer(
client_manager=mgr,
async_mode='asgi',
# ...
)
来源: backend/open_webui/socket/main.py:65-84,backend/open_webui/utils/redis.py:179-191。
分布式任务管理
系统协调跨多个后端实例的任务取消。periodic_session_pool_cleanup 和 periodic_usage_pool_cleanup 函数管理活跃会话和任务的生命周期 backend/open_webui/socket/main.py:173-207。
graph TD
UserStop["用户点击停止 [Chat.svelte]"]
StopAPI["stopTask API 调用 [src/lib/apis/index.ts]"]
TaskSvc["stop_task() [tasks.py]"]
SocketSio["sio.emit('events', {type: 'task:stop'})"]
Handler["chatEventHandler [前端]"]
UserStop --> StopAPI
StopAPI --> TaskSvc
TaskSvc --> SocketSio
SocketSio --> Handler
来源: backend/open_webui/socket/main.py:173-207,src/lib/apis/index.ts:244-274,src/lib/components/chat/Chat.svelte:88-91。
---
会话和使用量跟踪
SESSION_POOL 和 USAGE_POOL
后端使用 RedisDict(一个提供类似字典访问 Redis 哈希的包装器)维护活跃用户和模型消耗的实时可见性 backend/open_webui/socket/main.py:116-134。
- SESSION_POOL: 将
sid(Socket ID)映射到用户元数据和last_seen_at时间戳backend/open_webui/socket/main.py:123-128。 - USAGE_POOL: 跟踪每个会话的模型消耗指标
backend/open_webui/socket/main.py:129-134。 - 清理:
periodic_session_pool_cleanup从SESSION_POOL中回收孤立的会话,如果它们在SESSION_POOL_TIMEOUT(120 秒)内未更新心跳backend/open_webui/socket/main.py:173-194。
来源: backend/open_webui/socket/main.py:123-134,backend/open_webui/socket/main.py:173-194,backend/open_webui/socket/main.py:101。
用于协作笔记的 YdocManager
对于实时笔记编辑,YDOC_MANAGER 处理文档同步并将更新持久化到 Redis backend/open_webui/socket/main.py:167-170。
| 方法 | 用途 | 实现 |
|---|---|---|
YdocManager | 管理 Pycrdt 文档 | 由 Redis 支持,使用特定键前缀 backend/open_webui/socket/main.py:167-170。 |
heartbeat | 保持会话活跃 | 前端每 30 秒发送一次 src/routes/+layout.svelte:159-164。 |
来源: backend/open_webui/socket/main.py:167-170,src/routes/+layout.svelte:159-164。