agentic_huge_data_base / wiki
页面 Open WebUI · 2.5 实时通信架构·DeepWiki 中文全文译文

2.5 · 实时通信架构(Real-time Communication Architecture)

多模型对话工作台与知识应用入口 · 本章是 Open WebUI DeepWiki 中文译文的独立章节页,保留原始链接、源码锚点、模块标签和章节层级。

项目Open WebUI 章节2.5 状态全文译文 模块系统架构、接口与服务契约、界面与交互、频道、笔记与协作
源码线索
  • backend/open_webui/socket/main.py
  • backend/open_webui/socket/utils.py
  • backend/open_webui/tasks.py
  • backend/open_webui/test/util/test_redis.py
  • backend/open_webui/utils/rate_limit.py
  • backend/open_webui/utils/redis.py
  • src/lib/apis/index.ts
  • src/lib/stores/index.ts
模块标签
  • 系统架构
  • 接口与服务契约
  • 界面与交互
  • 频道、笔记与协作
  • 检索、召回与知识系统

中文译文

实时通信架构(中文译文)

原始 DeepWiki 页面:https://deepwiki.com/open-webui/open-webui/2.5-real-time-communication-architecture
翻译时间:2026-06-09T16:07:41.330Z
翻译模型:deepseek-chat
原文字符数:10255
项目:Open WebUI (open-webui)

---

实时通信架构

相关源文件

本 wiki 页面基于以下文件生成:

  • backend/open_webui/socket/main.py
  • backend/open_webui/socket/utils.py
  • backend/open_webui/tasks.py
  • backend/open_webui/test/util/test_redis.py
  • backend/open_webui/utils/rate_limit.py
  • backend/open_webui/utils/redis.py
  • src/lib/apis/index.ts
  • src/lib/stores/index.ts
  • src/routes/+layout.svelte

目的与范围

本文档描述了 Open WebUI 中的实时通信基础设施,该设施支持跨分布式服务器部署的实时更新、协作编辑和多用户交互。系统基于 Socket.IO 构建,并采用 Redis 进行状态管理。

有关更广泛的后端架构信息,请参阅后端架构。有关数据存储层的详细信息,请参阅数据与存储层

概述

Open WebUI 实现了一个双向实时通信系统,支持以下功能:

  • 流式 AI 响应:在聊天补全过程中实时输出。
  • 实时频道消息:包含输入状态指示和已读回执。
  • 协作文档编辑:基于 CRDT(Yjs)实现。
  • 多实例同步:通过 Redis pub/sub 实现。
  • 分布式会话管理:跨服务器副本管理会话。

该架构由三个主要层组成:

  1. 传输层:Socket.IO 服务器,支持 WebSocket 和 HTTP 长轮询回退。
  2. 状态层:基于 Redis 的分布式数据结构。
  3. 事件层:针对不同功能的类型化事件处理器。
核心组件架构

下图展示了前端组件与后端实体及存储机制之间的关联。

graph TB
    subgraph "前端 (SvelteKit)"
        LayoutSvelte["src/routes/+layout.svelte<br/>setupSocket()"]
        SocketStore["src/lib/stores/index.ts<br/>socket: Writable&lt;Socket&gt;"]
    end

    subgraph "后端 Socket.IO 服务器"
        SocketMain["backend/open_webui/socket/main.py<br/>sio: AsyncServer"]
        EventHandlers["事件处理器<br/>on('connect')<br/>on('user-join')<br/>on('heartbeat')"]
    end

    subgraph "Redis 状态管理"
        SessionPool["SESSION_POOL<br/>RedisDict<br/>sid → 用户数据"]
        UsagePool["USAGE_POOL<br/>RedisDict<br/>model_id → 连接数"]
        YdocManager["YDOC_MANAGER<br/>YdocManager<br/>文档状态"]
        AsyncRedisManager["socketio.AsyncRedisManager<br/>跨实例 pub/sub"]
    end

    subgraph "持久化 (SQL/Redis)"
        ChatsDB["open_webui.models.chats.Chats"]
        NotesDB["open_webui.models.notes.Notes"]
        ChannelsDB["open_webui.models.channels.Channels"]
    end

    LayoutSvelte -->|"io() 连接"| SocketMain
    LayoutSvelte -->|"存储 socket"| SocketStore

    SocketMain -->|"管理"| EventHandlers
    EventHandlers -->|"读写"| SessionPool
    EventHandlers -->|"读写"| UsagePool
    EventHandlers -->|"读写"| YdocManager

    SocketMain -->|"使用"| AsyncRedisManager
    AsyncRedisManager -->|"pub/sub"| SessionPool
    AsyncRedisManager -->|"pub/sub"| UsagePool
    AsyncRedisManager -->|"pub/sub"| YdocManager

    EventHandlers -.->|"更新"| ChatsDB
    EventHandlers -.->|"更新"| NotesDB
    EventHandlers -.->|"更新"| ChannelsDB

来源backend/open_webui/socket/main.py:65-170src/lib/stores/index.ts:31-34src/routes/+layout.svelte:115-125backend/open_webui/models/chats.py:14-14backend/open_webui/models/notes.py:15-15backend/open_webui/models/channels.py:13-13

Socket.IO 服务器配置

Socket.IO 服务器在 backend/open_webui/socket/main.py backend/open_webui/socket/main.py:73-96 中初始化,并根据 WEBSOCKET_MANAGER 环境变量 backend/open_webui/env.py:28-28 支持两种部署模式。

单机模式

对于单实例部署,服务器使用内存管理器 backend/open_webui/socket/main.py:86-96

Redis 支持模式

对于分布式部署,使用 socketio.AsyncRedisManager 在多个节点间同步事件 backend/open_webui/socket/main.py:65-84。此模式支持 Redis Sentinel 和 Redis Cluster 配置 backend/open_webui/socket/main.py:66-72

环境变量用途默认值
WEBSOCKET_MANAGER设置为 "redis" 以启用分布式模式None
ENABLE_WEBSOCKET_SUPPORT启用 WebSocket 传输True
WEBSOCKET_SERVER_PING_INTERVAL心跳间隔(秒)25
WEBSOCKET_SERVER_PING_TIMEOUT连接超时(秒)60
WEBSOCKET_REDIS_URLRedis 连接 URLredis://localhost:6379/0

来源backend/open_webui/socket/main.py:65-96backend/open_webui/env.py:27-41

连接管理

后端连接处理

当客户端连接时,服务器执行身份验证并填充 SESSION_POOL backend/open_webui/socket/main.py:314-370。前端通过根布局中的 setupSocket 发起连接,并在 auth 负载中发送 JWT 令牌 src/routes/+layout.svelte:115-124

sequenceDiagram
    participant Client as "浏览器客户端"
    participant SocketIO as "sio: AsyncServer"
    participant Auth as "decode_token()"
    participant SessionPool as "SESSION_POOL (RedisDict)"
    participant DB as "用户模型"

    Client->>SocketIO: connect(auth={token})
    SocketIO->>Auth: decode_token(token)
    Auth-->>SocketIO: user_id
    SocketIO->>DB: Users.get_user_by_id(user_id)
    DB-->>SocketIO: user_model
    SocketIO->>SessionPool: __setitem__(sid, user_data)
    SocketIO->>SocketIO: enter_room(sid, f"user:{user_id}")

    Note over Client, SocketIO: 每 30 秒心跳
    Client->>SocketIO: emit('heartbeat')

来源backend/open_webui/socket/main.py:314-370src/routes/+layout.svelte:158-164backend/open_webui/socket/utils.py:54-56backend/open_webui/models/users.py:12-12

分布式会话池

SESSION_POOLRedisDict 的一个实例,它封装了 Redis 哈希操作(hsethget),为分布式状态提供了类似字典的接口 backend/open_webui/socket/utils.py:44-52

过期的会话由 periodic_session_pool_cleanup 清理,该函数使用 RedisLock 确保只有一个集群节点执行清理操作 backend/open_webui/socket/main.py:173-194SESSION_POOL_TIMEOUT 设置为 120 秒 backend/open_webui/socket/main.py:101-101

事件系统架构

系统针对不同的功能集使用特定的频道。

事件类型与路由
频道处理器(后端)用途
events__event_emitter__聊天补全和状态更新 backend/open_webui/socket/main.py:710-724
events:channelchannel_events实时消息、输入状态指示 backend/open_webui/socket/main.py:438-471
ydoc:document:*ydoc_document_join使用 Yjs 进行协作笔记编辑 backend/open_webui/socket/main.py:473-500
usageusage跟踪活跃的模型连接 backend/open_webui/socket/main.py:300-312
协作编辑(Yjs)

YdocManager 负责在 Redis 中持久化二进制 CRDT 更新 backend/open_webui/socket/utils.py:124-136。它包含一个滚动压缩机制,当更新数量超过 COMPACTION_THRESHOLD(500)时,会压缩更新 backend/open_webui/socket/utils.py:125-125backend/open_webui/socket/utils.py:152-166

graph LR
    subgraph "Yjs 事件流"
        UpdateEvent["'ydoc:document:update'"]
        YdocMgr["YdocManager.append_to_updates()"]
        RedisList["Redis: rpush updates"]
        Compaction["_compact_updates_redis()"]
    end

    UpdateEvent --> YdocMgr
    YdocMgr --> RedisList
    RedisList -->|达到阈值| Compaction

来源backend/open_webui/socket/utils.py:124-166backend/open_webui/socket/main.py:616-660

多节点同步

分布式任务协调

Open WebUI 使用 Redis Pub/Sub 频道 REDIS_PUBSUB_CHANNEL 在节点间管理异步任务(如长时间运行的 AI 生成)backend/open_webui/tasks.py:22-22

redis_task_command_listener 监听 "stop" 命令,如果 task_id 存在于本地节点,则取消本地的 asyncio.Task 实例 backend/open_webui/tasks.py:25-42

Redis Sentinel 支持

对于高可用环境,SentinelRedisProxy 提供了对 Redis Sentinel 的透明封装,在同步和异步模式下均实现了自动故障转移和重试 backend/open_webui/utils/redis.py:31-147。它会拦截对主节点的调用,并最多重试 REDIS_SENTINEL_MAX_RETRY_COUNTbackend/open_webui/utils/redis.py:89-115

来源backend/open_webui/utils/redis.py:31-147backend/open_webui/tasks.py:25-42

性能与扩展

速率限制

RateLimiter 使用 Redis 桶实现了滚动窗口策略 backend/open_webui/utils/rate_limit.py:6-10。它在桶键上使用 increxpire backend/open_webui/utils/rate_limit.py:82-84,并使用 mget 聚合窗口内的计数 backend/open_webui/utils/rate_limit.py:89-90

清理机制
  • 使用池periodic_usage_pool_cleanup 定期运行,从 USAGE_POOL 中移除不活跃的模型连接 backend/open_webui/socket/main.py:196-215
  • 会话池periodic_session_pool_cleanupSESSION_POOL_TIMEOUT 后清理孤立的会话 backend/open_webui/socket/main.py:173-194

来源backend/open_webui/socket/main.py:101-101backend/open_webui/socket/main.py:173-215backend/open_webui/utils/rate_limit.py:6-92