异步操作(中文译文)
原始 DeepWiki 页面:https://deepwiki.com/mem0ai/mem0/3.5-asynchronous-operations
翻译时间:2026-05-27T08:44:53.432Z
翻译模型:deepseek-chat
原文字符数:8915
项目:Mem0 (mem0)
---
异步操作
相关源文件
以下文件被用作生成此维基页面的上下文:
docs/api-reference/entities/delete-user.mdxdocs/api-reference/entities/get-users.mdxdocs/api-reference/events/get-event.mdxdocs/api-reference/events/get-events.mdxdocs/openapi.jsondocs/platform/quickstart.mdxmem0/__init__.pymem0/client/main.pymem0/configs/prompts.pymem0/memory/main.pymem0/memory/storage.pymem0/memory/utils.pytests/configs/test_prompts.pytests/memory/test_main.pytests/test_chatty_llm_parsing.pytests/test_main.pytests/test_memory.pytests/test_proxy.py
本文档介绍 Mem0 在 Python 和 TypeScript SDK 中对非阻塞异步操作的支持。异步操作支持并发内存管理,可提升 I/O 密集型应用的吞吐量和资源利用率。
概述
Mem0 提供异步 API,允许应用在不阻塞主执行线程的情况下执行内存操作。这在以下场景中尤为有价值:
- Web 服务器和 API:并发处理多个内存操作。
- 批量处理:并行处理大量内存操作。
- 实时应用:管理内存的同时保持响应能力。
- 高吞吐系统:通过并发请求最大化 I/O 利用率。
异步支持在不同平台上的实现方式不同:
- Python:提供
AsyncMemory和AsyncMemoryClient类,使用async/await语法。 - TypeScript:所有客户端方法默认使用 Promise 实现异步。
来源:mem0/__init__.py:5-7, mem0/memory/main.py:1-41, mem0/client/main.py:1-25
Python 异步架构
下图将自然语言层面的"内存操作"映射到实现异步操作的代码实体。
内存类到代码实体的映射
graph TB
subgraph "同步类"
Memory["Memory<br/>mem0/memory/main.py"]
MemoryClient["MemoryClient<br/>mem0/client/main.py"]
end
subgraph "异步类"
AsyncMemory["AsyncMemory<br/>mem0/memory/main.py"]
AsyncMemoryClient["AsyncMemoryClient<br/>mem0/client/main.py"]
end
subgraph "共享组件工厂"
VectorStoreFactory["VectorStoreFactory<br/>mem0/utils/factory.py"]
LlmFactory["LlmFactory<br/>mem0/utils/factory.py"]
EmbedderFactory["EmbedderFactory<br/>mem0/utils/factory.py"]
SQLiteManager["SQLiteManager<br/>mem0/memory/storage.py"]
end
Memory --> VectorStoreFactory
Memory --> LlmFactory
Memory --> SQLiteManager
AsyncMemory --> VectorStoreFactory
AsyncMemory --> LlmFactory
AsyncMemory --> SQLiteManager
MemoryClient --> AsyncMemoryClient
来源:mem0/memory/main.py:24-41, mem0/client/main.py:10-135, mem0/memory/storage.py:11-19, mem0/__init__.py:5-7
AsyncMemory 类(自托管)
AsyncMemory 类为自托管部署提供了所有核心内存操作的 async/await 版本。它继承自 MemoryBase,并镜像了 Memory 类的 API,但使用协程实现。
初始化
from mem0 import AsyncMemory
from mem0.configs.base import MemoryConfig
# 使用配置初始化
config = MemoryConfig(
vector_store={
"provider": "qdrant",
"config": {
"host": "localhost",
"port": 6333,
}
}
)
memory = AsyncMemory(config=config)
关键方法
同步 Memory 类的所有方法在 AsyncMemory 中都有对应的异步版本:
| 同步方法 | 异步方法 | 描述 |
|---|---|---|
add() | await add() | 从消息中添加记忆(使用大语言模型进行提取) |
search() | await search() | 按查询搜索记忆(语义 + 关键词) |
get() | await get() | 按 ID 检索记忆 |
get_all() | await get_all() | 列出所有记忆并支持过滤 |
update() | await update() | 更新现有记忆 |
delete() | await delete() | 按 ID 删除记忆 |
delete_all() | await delete_all() | 基于过滤条件批量删除 |
history() | await history() | 检索指定记忆 ID 的历史记录 |
来源:mem0/memory/main.py:118-124, tests/memory/test_main.py:120-124, tests/test_memory.py:25-31
AsyncMemoryClient 类(平台)
AsyncMemoryClient 类为托管的 Mem0 平台 API 提供异步操作。
初始化
from mem0 import AsyncMemoryClient
client = AsyncMemoryClient(
api_key="your-api-key"
)
并发操作示例
使用 asyncio.gather 可以实现高并发的内存入库。
import asyncio
from mem0 import AsyncMemory
async def batch_operations():
memory = AsyncMemory()
tasks = [
memory.add(
messages=[{"role": "user", "content": f"Message {i}"}],
user_id=f"user_{i}"
)
for i in range(5)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
来源:mem0/client/main.py:108-135, mem0/client/main.py:164-185
平台异步模式
async_mode 参数控制内存操作在平台端是同步执行还是异步执行。
平台执行流程
graph LR
ClientAdd["AsyncMemoryClient.add()"]
AsyncModeCheck{"async_mode<br/>参数"}
SyncExecution["平台同步执行<br/>等待处理完成"]
AsyncExecution["平台异步执行<br/>返回事件 ID"]
ClientAdd --> AsyncModeCheck
AsyncModeCheck -->|"False"| SyncExecution
AsyncModeCheck -->|"True(默认)"| AsyncExecution
async_mode=True(默认):平台异步处理请求,并立即返回事件 ID。async_mode=False:平台同步处理请求,等待处理完成后返回最终结果。
来源:mem0/client/main.py:164-186, docs/openapi.json:123-144
性能考量
并发 vs 阻塞
异步操作通过重叠 I/O 等待时间,显著减少批量任务的总耗时。
graph LR
subgraph "同步流程(Memory)"
S1["add(msg1)<br/>等待大语言模型/数据库"]
S2["add(msg2)<br/>等待大语言模型/数据库"]
STotal["总耗时:S1 + S2"]
end
subgraph "异步流程(AsyncMemory)"
A1["await add(msg1)"]
A2["await add(msg2)"]
ATotal["asyncio.gather(A1, A2)<br/>总耗时:max(A1, A2)"]
end
S1 --> S2 --> STotal
A1 -. 并发 .-> ATotal
A2 -. 并发 .-> ATotal
内部实现细节
同步 Memory 类在内部使用 concurrent.futures.ThreadPoolExecutor 来并行化某些操作,同时保持阻塞的外部 API。例如,在 add 操作期间,事实提取和图更新可以并行触发。
相比之下,AsyncMemory 对这些任务使用原生 asyncio 模式,以避免阻塞事件循环。
来源:mem0/memory/main.py:1-11, tests/memory/test_main.py:126-153
最佳实践
- 生命周期管理:将
AsyncMemory包装在异步上下文管理器或 FastAPI 启动/关闭钩子中,以确保正确管理向量存储连接和SQLiteManager中的数据库句柄。 - 弹性:在使用
AsyncMemoryClient时,为网络绑定的异步操作实现重试机制。 - 事件循环感知:切勿在
async def函数之外调用AsyncMemory方法,以避免出现RuntimeError: no running event loop错误。 - 元数据保留:使用
update()时,确保正确传递元数据以保留上下文信息。
# 健壮的异步更新示例
# 来自 tests/memory/test_main.py:141-153
async def robust_update(memory_instance, mem_id, text, metadata):
result = await memory_instance.update(
memory_id=mem_id,
data=text,
metadata=metadata
)
return result
来源:mem0/memory/storage.py:11-19, tests/memory/test_main.py:141-153, mem0/client/main.py:164-186