实时通信(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/open-webui/open-webui/15-real-time-communication
翻译时间:2026-06-09T16:11:34.412Z
翻译模型:deepseek-chat
原文字符数:9676
项目:Open WebUI (open-webui)
---
实时通信
相关源文件
以下文件为本 wiki 页面的生成提供了上下文:
backend/open_webui/socket/main.pybackend/open_webui/socket/utils.pybackend/open_webui/tasks.pybackend/open_webui/test/util/test_redis.pybackend/open_webui/utils/rate_limit.pybackend/open_webui/utils/redis.pysrc/lib/apis/index.tssrc/lib/stores/index.tssrc/routes/+layout.svelte
目的与范围
本文档描述了 Open WebUI 中的实时通信系统。该系统使用 Socket.IO 实现前后端双向通信,并可选配 Redis 以支持分布式部署。系统支持实时聊天流式传输、协作文档编辑、会话管理以及分布式任务协调。
关于聊天消息处理与流式传输,请参见聊天系统。关于后端配置与环境设置,请参见环境配置。
架构概览
实时通信层通过 Socket.IO 将 SvelteKit 前端与 FastAPI 后端连接起来,Redis 则为多实例部署提供分布式状态管理。
系统组件
graph TB
subgraph "前端(浏览器)"
LayoutComponent["布局组件<br/>+layout.svelte"]
SocketStore["Socket 存储<br/>stores/index.ts:socket"]
EventHandlers["事件处理器<br/>chatEventHandler<br/>channelEventHandler"]
end
subgraph "后端 Socket.IO 服务器"
SocketIOServer["Socket.IO 服务器<br/>socket/main.py:sio"]
ConnectHandler["@sio.event<br/>connect()"]
UserJoinHandler["@sio.on('user-join')<br/>user_join()"]
HeartbeatHandler["@sio.on('heartbeat')<br/>heartbeat()"]
EventEmitter["get_event_emitter()<br/>__event_emitter__"]
EventCaller["get_event_call()<br/>__event_caller__"]
end
subgraph "状态管理"
SessionPool["SESSION_POOL<br/>RedisDict 或 dict"]
UsagePool["USAGE_POOL<br/>RedisDict 或 dict"]
YdocManager["YDOC_MANAGER<br/>YdocManager"]
end
subgraph "Redis 后端"
RedisServer["Redis 服务器<br/>Sentinel 或 Cluster"]
RedisDict["RedisDict<br/>socket/utils.py"]
RedisLock["RedisLock<br/>socket/utils.py"]
RedisPubSub["Redis Pub/Sub<br/>tasks.py"]
end
LayoutComponent -->|"io.connect()"| SocketIOServer
SocketIOServer -->|"emit events"| EventHandlers
EventHandlers -->|"emit commands"| SocketIOServer
ConnectHandler --> SessionPool
UserJoinHandler --> SessionPool
HeartbeatHandler --> SessionPool
EventEmitter -->|"emit('events')"| SocketIOServer
EventCaller -->|"call('events')"| SocketIOServer
SessionPool -->|"若 WEBSOCKET_MANAGER='redis'"| RedisDict
UsagePool -->|"若 WEBSOCKET_MANAGER='redis'"| RedisDict
YdocManager -->|"若 REDIS"| RedisServer
RedisDict --> RedisServer
RedisLock --> RedisServer
RedisPubSub --> RedisServer
来源: backend/open_webui/socket/main.py:63-171、backend/open_webui/socket/utils.py:9-131、backend/open_webui/tasks.py:25-42
部署模式
系统根据 WEBSOCKET_MANAGER 环境变量支持两种部署模式 backend/open_webui/env.py:28:
| 模式 | 管理器 | 使用场景 | 状态存储 |
|---|---|---|---|
| 单机 | None | 单服务器实例 | 内存中的 Python 字典 backend/open_webui/socket/main.py:158-162 |
| 分布式 | redis | 多服务器实例 | Redis 支持的 RedisDict 和 AsyncRedisManager backend/open_webui/socket/main.py:65-84 |
来源: backend/open_webui/socket/main.py:65-96、backend/open_webui/socket/main.py:105-166
详细信息请参见 WebSocket 架构。
WebSocket 连接生命周期
连接建立
后端的 connect 处理器通过 JWT 对用户进行身份验证,并在 SESSION_POOL 中初始化其会话 backend/open_webui/socket/main.py:302-359。前端在根布局中使用 socket.io-client 发起连接,并传递身份验证令牌 src/routes/+layout.svelte:116-124。
sequenceDiagram
participant Browser
participant SocketIO as Socket.IO 服务器
participant Auth as decode_token()
participant DB as 用户模型
participant Redis as SESSION_POOL
Browser->>SocketIO: io.connect({auth: {token: ...}})
activate SocketIO
SocketIO->>SocketIO: @sio.event connect()
SocketIO->>Auth: decode_token(token)
Auth-->>SocketIO: {id: user_id}
SocketIO->>DB: get_user_by_id(user_id)
DB-->>SocketIO: 用户对象
SocketIO->>Redis: SESSION_POOL[sid] = user
SocketIO->>SocketIO: enter_room(sid, "user:{user_id}")
SocketIO-->>Browser: 连接已建立
deactivate SocketIO
来源: src/routes/+layout.svelte:116-124、backend/open_webui/socket/main.py:302-359、backend/open_webui/utils/auth.py:42-42
房间管理
客户端被组织到不同的房间中,以实现定向事件广播。这一过程在 connect 和 user-join 事件中处理 backend/open_webui/socket/main.py:315-349。
| 房间模式 | 用途 | 成员 |
|---|---|---|
user:{user_id} | 用户特定事件 | 属于特定用户的所有会话 |
channel:{channel_id} | 群组消息 | 特定频道的所有成员 |
doc_{document_id} | 协同编辑 | 当前正在编辑文档的所有用户 |
来源: backend/open_webui/socket/main.py:315-349
事件处理系统
系统使用统一的 events 和 events:channel 通道来路由各种实时更新。
后端事件发射
后端提供了 get_event_emitter 工厂函数,用于简化向特定用户发送更新的操作,常用于长时间运行的 LLM 生成过程中 backend/open_webui/socket/main.py:695-813。
sequenceDiagram
participant Logic as 后端逻辑
participant Factory as get_event_emitter()
participant Emitter as __event_emitter__()
participant SocketIO as sio.emit()
Logic->>Factory: get_event_emitter(request_info)
Factory-->>Logic: __event_emitter__ 函数
Logic->>Emitter: {type: "status", data: "处理中..."}
Emitter->>SocketIO: emit('events', data, room="user:{user_id}")
来源: backend/open_webui/socket/main.py:695-813
事件类型
系统处理以下几类事件:
- 聊天事件:
chat:completion、chat:title、chat:tags。 - 执行事件:
execute:python、execute:tool。 - 频道事件:
typing、message、channel:created。 - Yjs 事件:
ydoc:document:update、ydoc:awareness:update。
详细信息请参见事件处理系统。
多节点同步
在分布式环境中,Redis 充当会话数据和任务协调的唯一真实来源。
分布式任务管理
当用户请求停止生成任务时,该命令通过 Redis Pub/Sub 广播到所有节点 backend/open_webui/tasks.py:22-42。create_task 函数用于注册任务 backend/open_webui/tasks.py:104-124,而 stop_task 则使用 Redis Pub/Sub 跨实例发送取消信号 backend/open_webui/tasks.py:145-163。
graph LR
subgraph "节点 A"
API["API: stop_task(task_id)"]
LocalTasksA["本地 tasks{}"]
end
subgraph "Redis"
PubSub["频道: tasks:commands"]
end
subgraph "节点 B"
Listener["redis_task_command_listener()"]
LocalTasksB["本地 tasks{task_id: Task}"]
end
API -->|"redis_send_command()"| PubSub
PubSub --> Listener
Listener -->|"task.cancel()"| LocalTasksB
来源: backend/open_webui/tasks.py:25-42、backend/open_webui/tasks.py:145-174
使用 RedisDict 共享状态
RedisDict 工具允许不同的服务器实例像访问本地 Python 字典一样访问共享的 SESSION_POOL 和 USAGE_POOL backend/open_webui/socket/utils.py:44-122。这确保了分布式设置中所有节点状态的一致性。periodic_session_pool_cleanup 使用分布式锁定期清理孤立会话 backend/open_webui/socket/main.py:173-193。
来源: backend/open_webui/socket/utils.py:44-122、backend/open_webui/socket/main.py:173-193
详细信息请参见多节点同步。
协同编辑(Yjs)
Open WebUI 使用 Yjs 实现笔记和文档的无冲突协同编辑。YdocManager 负责将这些文档更新持久化到 Redis 中 backend/open_webui/socket/utils.py:124-136。
- 存储:更新存储在 Redis 列表(
RPUSH)中,并定期压缩为单个快照,以优化内存和加载时间backend/open_webui/socket/utils.py:137-145。 - 压缩:当文档达到
COMPACTION_THRESHOLD(500 次更新)时,_compact_updates_redis方法会压缩最旧的一半更新backend/open_webui/socket/utils.py:152-166。
来源: backend/open_webui/socket/utils.py:124-176、backend/open_webui/socket/main.py:448-510