agentic_huge_data_base / wiki
页面 Open WebUI · 4.5 WebSocket 事件处理·DeepWiki 中文全文译文

4.5 · WebSocket 事件处理(WebSocket Event Handling)

多模型对话工作台与知识应用入口 · 本章是 Open WebUI DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Open WebUI 章节4.5 状态全文译文 模块频道、笔记与协作、接口与服务契约、界面与交互、系统架构
源码线索
  • backend/open_webui/socket/main.py
  • backend/open_webui/socket/utils.py
  • backend/open_webui/tasks.py
  • backend/open_webui/test/util/test_redis.py
  • backend/open_webui/utils/rate_limit.py
  • backend/open_webui/utils/redis.py
  • src/lib/apis/index.ts
  • src/lib/components/chat/Chat.svelte
  • src/lib/components/chat/MessageInput.svelte
  • src/lib/components/chat/Messages.svelte
模块标签
  • 频道、笔记与协作
  • 接口与服务契约
  • 界面与交互
  • 系统架构
  • 认证、权限与安全

中文译文

WebSocket 事件处理(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/open-webui/open-webui/4.5-websocket-event-handling
翻译时间:2026-06-09T16:08:11.844Z
翻译模型:deepseek-chat
原文字符数:10685
项目:Open WebUI (open-webui)

---

WebSocket 事件处理

相关源文件

以下文件为本 wiki 页面的生成上下文:

  • backend/open_webui/socket/main.py
  • backend/open_webui/socket/utils.py
  • backend/open_webui/tasks.py
  • backend/open_webui/test/util/test_redis.py
  • backend/open_webui/utils/rate_limit.py
  • backend/open_webui/utils/redis.py
  • src/lib/apis/index.ts
  • src/lib/components/chat/Chat.svelte
  • src/lib/components/chat/MessageInput.svelte
  • src/lib/components/chat/Messages.svelte
  • src/lib/components/chat/Messages/Message.svelte
  • src/lib/components/chat/Messages/MultiResponseMessages.svelte
  • src/lib/components/chat/Messages/ResponseMessage.svelte
  • src/lib/components/chat/Messages/UserMessage.svelte
  • src/lib/stores/index.ts
  • src/lib/utils/index.ts
  • src/routes/+layout.svelte

目的与范围

本文档介绍 Open WebUI 中的实时通信系统,该系统使用 Socket.IO 为聊天响应、状态变更和协作编辑提供实时更新。WebSocket 事件系统支持 AI 响应流式传输、实时通知以及跨浏览器标签页和服务器实例的多用户文档协作。

关于更广泛的实时通信架构和 Redis 同步,请参见 2.5 实时通信架构。关于协作文档编辑的详细信息,请参见 9.3 协作编辑

---

架构概览

WebSocket 事件系统由三个主要层次组成:

  1. Socket.IO 服务器 - 后端事件路由器,可选配 Redis pub/sub 以支持分布式部署 backend/open_webui/socket/main.py:65-96
  2. 事件处理器 - 前端消费者,处理传入事件并更新 UI src/lib/components/chat/Chat.svelte:179-247
  3. 分布式状态 - 基于 Redis 的存储,用于会话管理和使用量跟踪 backend/open_webui/socket/main.py:116-142
WebSocket 组件交互
graph TB
    subgraph "前端组件"
        ChatSvelte["Chat.svelte [src/lib/components/chat/Chat.svelte]"]
        MessageInput["MessageInput.svelte [src/lib/components/chat/MessageInput.svelte]"]
        ResponseMsg["ResponseMessage.svelte [src/lib/components/chat/Messages/ResponseMessage.svelte]"]
    end

    subgraph "Socket.IO 层 [backend/open_webui/socket/main.py]"
        SocketServer["sio = socketio.AsyncServer"]
        EventsHandler["@sio.on('events')"]
        UserJoin["@sio.on('user-join')"]
        Usage["@sio.on('usage')"]
    end

    subgraph "Redis 状态(可选)"
        SessionPool["SESSION_POOL [RedisDict]"]
        UsagePool["USAGE_POOL [RedisDict]"]
        YdocMgr["YDOC_MANAGER [YdocManager]"]
        RedisPubSub["mgr = socketio.AsyncRedisManager"]
    end

    subgraph "后端服务"
        ChatAPI["聊天 API"]
        TaskMgr["tasks.py [backend/open_webui/tasks.py]"]
    end

    ChatSvelte -->|"$socket?.on('events')"| SocketServer

    SocketServer --> EventsHandler
    SocketServer --> UserJoin
    SocketServer --> Usage

    EventsHandler -->|存储会话| SessionPool
    Usage -->|跟踪使用量| UsagePool

    SocketServer <-->|分布式事件| RedisPubSub

    ChatAPI -->|"sio.emit('events')"| SocketServer
    TaskMgr -->|"sio.emit('events')"| SocketServer

来源: backend/open_webui/socket/main.py:63-170src/lib/components/chat/Chat.svelte:30-31src/routes/+layout.svelte:115-207

---

事件消息结构

所有事件都遵循标准化的负载结构,通过 events 通道发送。前端 chatEventHandler 根据 chat_idmessage_id 进行路由。

字段类型描述
chat_idstring用于路由的聊天标识符。
message_idstring目标消息 ID。
data.typestring事件类型标识符(例如 chat:message:delta)。
data.dataobject类型特定的负载。

来源: src/lib/components/chat/Chat.svelte:179-247backend/open_webui/socket/main.py:650-674

---

事件类型

聊天流式事件
事件类型用途数据字段
status在生成过程中显示状态更新doneactiondescriptionurlsquery
chat:completion表示生成完成donecontenttitle
chat:message:delta流式传输消息内容块content
chat:message替换整个消息内容content
chat:message:error报告生成错误error 对象

来源: src/lib/components/chat/Messages/ResponseMessage.svelte:67-116src/lib/components/chat/Chat.svelte:179-247

引用和代码执行事件

引用和代码执行结果作为 history.messages 结构中的特殊消息更新进行处理。

graph LR
    Source["'source' 或 'code_execution' 事件"]
    TypeCheck{"data.type === 'code_execution'?"}
    CodeArray["message.code_executions[]"]
    SourceArray["message.sources[]"]

    Source --> TypeCheck
    TypeCheck -->|是| CodeArray
    TypeCheck -->|否| SourceArray

来源: src/lib/components/chat/Messages/ResponseMessage.svelte:89-101src/lib/components/chat/Chat.svelte:166-169

---

前端事件处理

聊天组件处理器

主要的事件消费者是 Chat.svelte 中定义的监听器。它响应式地管理本地 history 存储。

sequenceDiagram
    participant Backend as 后端 [socket/main.py]
    participant Socket as $socket 存储
    participant Handler as chatEventHandler [Chat.svelte]
    participant History as history.messages

    Backend->>Socket: emit('events', payload)
    Socket->>Handler: 触发监听器
    Handler->>Handler: 验证 chat_id

    alt 事件类型: chat:message:delta
        Handler->>History: 将内容追加到 message_id
    else 事件类型: status
        Handler->>History: 更新 statusHistory 数组
    else 事件类型: chat:completion
        Handler->>Handler: chatCompletionEventHandler()
    end

关键实现细节:

  • 响应性:history.messages 对象的更新会触发 Messages.svelteResponseMessage.svelte 的重新渲染 src/lib/components/chat/Messages/ResponseMessage.svelte:124-136
  • 节流:Messages.svelte 中,handleHistoryChange 函数使用 requestAnimationFrame 在活跃流式传输期间将消息列表重建节流为每动画帧一次,以保持性能 src/lib/components/chat/Messages.svelte:107-130
  • 结构更新:history.currentId 发生变化时(例如导航到新聊天),列表会立即重建,不进行节流 src/lib/components/chat/Messages.svelte:116-120

来源: src/lib/components/chat/Chat.svelte:179-247src/lib/components/chat/Messages.svelte:107-132src/lib/components/chat/Messages.svelte:85-103

---

后端事件发送

Socket.IO 服务器配置

服务器同时支持 WebSocket 和轮询传输。在分布式环境中,它使用 AsyncRedisManager 跨多个节点广播事件 backend/open_webui/socket/main.py:65-72

# backend/open_webui/socket/main.py:65-84
if WEBSOCKET_MANAGER == 'redis':
    mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL, ...)
    sio = socketio.AsyncServer(
        client_manager=mgr,
        async_mode='asgi',
        # ...
    )

来源: backend/open_webui/socket/main.py:65-84backend/open_webui/utils/redis.py:179-191

分布式任务管理

系统协调跨多个后端实例的任务取消。periodic_session_pool_cleanupperiodic_usage_pool_cleanup 函数管理活跃会话和任务的生命周期 backend/open_webui/socket/main.py:173-207

graph TD
    UserStop["用户点击停止 [Chat.svelte]"]
    StopAPI["stopTask API 调用 [src/lib/apis/index.ts]"]
    TaskSvc["stop_task() [tasks.py]"]
    SocketSio["sio.emit('events', {type: 'task:stop'})"]
    Handler["chatEventHandler [前端]"]

    UserStop --> StopAPI
    StopAPI --> TaskSvc
    TaskSvc --> SocketSio
    SocketSio --> Handler

来源: backend/open_webui/socket/main.py:173-207src/lib/apis/index.ts:244-274src/lib/components/chat/Chat.svelte:88-91

---

会话和使用量跟踪

SESSION_POOL 和 USAGE_POOL

后端使用 RedisDict(一个提供类似字典访问 Redis 哈希的包装器)维护活跃用户和模型消耗的实时可见性 backend/open_webui/socket/main.py:116-134

  • SESSION_POOL:sid(Socket ID)映射到用户元数据和 last_seen_at 时间戳 backend/open_webui/socket/main.py:123-128
  • USAGE_POOL: 跟踪每个会话的模型消耗指标 backend/open_webui/socket/main.py:129-134
  • 清理: periodic_session_pool_cleanupSESSION_POOL 中回收孤立的会话,如果它们在 SESSION_POOL_TIMEOUT(120 秒)内未更新心跳 backend/open_webui/socket/main.py:173-194

来源: backend/open_webui/socket/main.py:123-134backend/open_webui/socket/main.py:173-194backend/open_webui/socket/main.py:101

用于协作笔记的 YdocManager

对于实时笔记编辑,YDOC_MANAGER 处理文档同步并将更新持久化到 Redis backend/open_webui/socket/main.py:167-170

方法用途实现
YdocManager管理 Pycrdt 文档由 Redis 支持,使用特定键前缀 backend/open_webui/socket/main.py:167-170
heartbeat保持会话活跃前端每 30 秒发送一次 src/routes/+layout.svelte:159-164

来源: backend/open_webui/socket/main.py:167-170src/routes/+layout.svelte:159-164