状态与持久化
一句话总结agent 的状态如何表示、保存与恢复:消息历史、变量、计划进度、子 agent 状态。持久化让 agent 能断点续跑、容错、长时间运行,以及支持人在环的暂停-审批-继续。
它解决什么问题
agent 可能跑很久、会失败、需要人介入。把状态显式化并持久化,才能恢复、审计、暂停/恢复,而不是从头再来。
设计维度 / 实现谱系
- 状态表示:隐式(消息列表)↔ 显式 state 对象(图式框架)↔ 黑板
- 持久化后端:内存 ↔ 文件 ↔ 数据库/KV ↔ checkpointer 抽象
- 粒度:整体快照 ↔ 每步 checkpoint(支持时间旅行/重放)
- 续跑:失败重启、暂停-恢复(与 人在环强相关)
- 长时运行:心跳、调度、跨进程(SwarmClaw、Aeon)
关键要点
- 显式 state + checkpoint 是图式框架(LangGraph/Mastra)的生产级优势。
- 持久化是长期自治 agent 的地基。
- 与 memory 区别:state 是执行现场,memory 是长期知识。
关联
各框架实现对比
下表汇总 47 个实现了「状态 / 持久化」的框架(源码级阅读结论)。网站上以可展开 + 源码节选呈现。
| 框架 | 实现方式 |
|---|---|
| Aeon | 全部状态以文件提交进 Git main 分支;runner 末尾 git commit + pull —rebase + push 带 5 次重试与冲突自动消解;并发 workflow 靠 concurrency.group 串行化 tick、消息走唯一组并行;沙箱内 .pending-notify/ 缓冲通知待 post-run 重投 |
| AG2 | 会话状态=各 Agent 的 _oai_messages(进程内);cache/ 持久化 LLM 响应缓存(disk/redis/cosmos/in-memory),按 seed/cache_seed 复用;对话历史可 clear_history 或保留 N 条;RAG/Teachability 经向量库落盘;无内建跨进程会话恢复 |
| Agency Swarm | Agency(load_threads_callback=, save_threads_callback=) 注入持久化回调:ThreadManager 初始化时 load,每次 add_message/run 结束经 PersistenceHooks.on_run_end save(扁平消息 list,含 agent/callerAgent/timestamp 元数据)。存到 DB/文件由用户实现 |
| Agent-LLM (AGiXT) | 全部状态入 SQL:Agent/Conversation/Message/Chain/ChainStep/ChainStepResponse/Memory/TaskItem 等 SQLAlchemy 模型(DB.py);SQLite 或 Postgres 二选一;定时/重复任务由 Task+TaskMonitor 持久化调度(scheduled/due_date/cron 式重复) |
| AgentDock | 会话隔离 SessionManager |
| AgentField | 控制平面统一持久层:local=SQLite+BoltDB / cloud=PostgreSQL(goose 迁移);执行记录、workflow execution、记忆四作用域、配置存储(POST /api/v1/configs/:key)、payload store 均落库;身份与 VC 链持久化可离线验证 |
| Agentic Context Engine (ACE) | Skillbook 序列化为 JSON(v2 schema)+ embedding sidecar(.embeddings.npz);save_to_file/load_from_file;CheckpointStep 按间隔存档、PersistStep 每样本写目标文件(如项目 CLAUDE.md);SimilarityDecision(KEEP)持久化去重决策 |
| AgentScope | 全部运行态收敛进单个 pydantic AgentState(session_id/context/summary/reply_id/cur_iter/permission_context/tool_context/tasks_context),可整体序列化恢复;服务侧 app/storage/ 提供 RedisStorage + SessionRecord/AgentRecord/UserRecord 等做多会话持久化;文件读缓存带 mtime 失效与 LRU 淘汰 |
| Agentset | Prisma + Postgres 为权威状态(org/namespace/document/ingest-job/webhook 等 schema,packages/db/prisma/schema/,含 40+ 迁移);向量数据在 Pinecone/Turbopuffer;批次/限流用 Redis;文件用 S3 兼容存储 |
| AgentVerse | 运行态全在内存:环境的 cnt_turn/last_messages/rule_params 与各 agent 的 memory;reset() 清空重来。落盘仅限结果——task-solving save_result() 写 ./results/ |
| Ailoy | 会话状态不持久化(messages 由调用方自管);持久化的是模型工件:cache 模块用 manifest + 文件系统缓存把从 S3 下载的权重/rt./tokenizer 落盘并支持 checksum 校验、download/remove(src/cache/mod.rs, src/model/local/local_language_model.rs:95,113);WASM 侧用 OPFS(FileSystem API) 缓存(Cargo.toml:114 web-sys FileSystem) |
| Astron Agent | 运行态:workflow VariablePool + WorkflowEngineCtx(节点状态/链路);DAG 引擎用 pickle 序列化做跨节点传递;持久化:MySQL(结构化)、Redis(缓存/会话/EventRegistry 注册表)、MinIO(文件)、memory 服务(会话 DB)、workflow 用 alembic 管理 schema 版本 |
| AutoGen | 全链路 save_state()/load_state() → Mapping:agent、ChatCompletionContext、group chat manager(各自 ManagerState) 与 Team(TeamState 聚合各 agent state)均可序列化;CancellationToken 控制中断 |
| Botpress | Snapshot 暂停/恢复:工具内 throw SnapshotSignal 即序列化当前执行状态→Snapshot.toJSON() 存库→后续 execute({snapshot}) 从断点续跑(适合长流程/人工审批);跨迭代 variables 持久 |
| ConnectOnion | 本地 current_session(runtime-only) + .co/ 落盘(logs/evals/uploads);input(session=…) 可恢复无状态会话;host() 经 session/storage.py 做服务端会话持久化与合并 |
| Cordum | Redis 存工作流状态、job 元数据、指针负载;job 生命周期状态机(Succeeded/Approval/Denied… 见 engine.go setJobState);审批存储 Redis(core/edge/approval_store_redis.go);安全裁决落 jobStore(engine.go:2213 SetSafetyDecision,带 JobHash 防过期请求重放);审计哈希链 head 指针 CAS 持久于 Redis |
| Cortex Memory | 核心。混合持久化:cortex:// 虚拟文件系统(markdown 真相源,filesystem/operations.rs,filesystem/uri.rs:170) + Qdrant 向量索引(vector_store/qdrant.rs);VectorSyncManager 维护二者一致;MemoryIndex 做版本/元数据追踪 |
| CrewAI | Flow 结构化 state(Pydantic BaseModel) + @persist/persistence 默认 SQLite 落盘,支持断点续跑;Crew 侧 CheckpointConfig+apply_checkpoint 做 task 级 checkpoint 恢复 |
| Dust | 全量 Postgres(Sequelize ORM + Resource 抽象层)持久化对话/agent/action;Temporal 持久化工作流状态(可断点续跑);core 侧 Run/Block 结果存于 stores(Postgres),文档/向量存 Qdrant,分析存 Elasticsearch;Redis 做流式事件 |
| E2B | 沙箱文件系统即状态;持久化靠 pause/resume + snapshot:pause() 暂停沙箱以便后续 Sandbox.connect(id) 自动恢复;createSnapshot() 把当前文件系统+状态固化为快照,Sandbox.create(snapshotId) 从快照派生新沙箱(快照在沙箱删除后仍存活) |
| Haystack | ①结构持久化:Pipeline.to_dict/from_dict + dumps/loads(YAML) 整图存取;②运行态持久化=Breakpoint/Snapshot:在 component/chat_generator/tool_invoker 处设断点,触发即把 inputs+component_visits+state 存成 PipelineSnapshot/AgentSnapshot(JSON),可从快照 resume;Agent State 序列化 schema |
| hcom | 全部状态在单个 SQLite(WAL 模式,db/mod.rs:106),路径 ~/.hcom/hcom.db(可经 HCOM_DIR 按项目隔离)。schema 版本化+迁移(db/mod.rs:39, :41)。events append-only 同时是 relay 复制源。session/process binding 表把 OS 进程/会话映射到稳定 agent 身份;reset 会归档替换 DB 文件,长连接经 inode 检测重连(db/mod.rs:123) |
| Hermes Agent | hermes_state.py = SQLite 会话库(消息 + FTS5/trigram 全文索引 + checkpoint),跨会话/跨平台连续;MEMORY.md/USER.md 文件落盘;profiles 多实例隔离配置/会话/skill/记忆;tools/checkpoint_manager.py 文件快照可回滚 |
| Hive | Checkpoint-based 崩溃恢复:CheckpointStore + CheckpointConfig,execute(session_state=…) 可从 paused_at / resume_from_checkpoint 恢复;session_store/conversation_store 写穿落盘;~/.hive/ 存加密 credentials |
| Lagent | state_dict()/load_state_dict() 仿 PyTorch 递归导出/载入各(子)agent 的 memory,键带 model_spec 以重建 AgentMessage 子类;HTTP server 经 /memory/{session_id} 暴露会话状态。落盘格式由调用方决定(无内建 DB) |
| LangChain | 状态=TypedDict AgentState(+middleware 合并出的 schema,factory.py:1043);checkpointer(线程内会话) + store(跨线程) 由 LangGraph 提供并透传 compile();jump_to 为 EphemeralValue 不持久化 |
| Llama Agentic System (llama-stack-apps) | 服务端会话持久化(enable_session_persistence=True);agent_id/session_id 由 server 分配并复用;向量库 register 后持久;客户端侧仅缓存 chat_history/context(内存字典) |
| LlamaIndex | 运行态存 Workflow Context.store(memory/state/scratchpad/num_iterations 等 KV);initial_state 深拷贝入 store;RAG 侧 StorageContext.persist() 落盘 docstore/index_store/vector_store,load_index_from_storage 恢复;对话历史经 SQLAlchemyChatStore(默认 sqlite 内存,可换持久 DB) |
| LoongFlow | ① Checkpoint:按 checkpoint-iter-{iter}-{count} 目录定期落盘进化数据库(solutions/.json + metadata.json + best_solution.json),可从 checkpoint 恢复 completion_count 与种群(pes_agent.py:348,in_memory.py:298,377);② 进化记忆后端可选 in-memory 或 Redis(MemoryFactory);③ Workspace 把每轮 planner/executor/summarizer/evaluator 产物按 {task_id}/{iter}/ 结构化落盘 |
| Maestro | 运行态状态仅存内存列表,进程结束即丢;唯一持久化是结束时把完整交换日志写成 {timestamp}_{objective}.md + 生成的代码工程落盘 |
| Mastra | 可插拔 storage(MastraStorage base + composite store + filesystem/in-memory/外部 DB 适配器),按 domain 分库(agents/skills/workspaces/mcp-clients/scorer-definitions…)持久化线程、消息、memory、workflow snapshot;workflow 快照支持 resumeStream();request-context/di 管运行时上下文 |
| MetaGPT | SerializationMixin + Team.serialize/deserialize 把整个团队(含 context/角色/记忆)存成 team.json 支持断点恢复(recover_path);Environment.history(Memory)留存全量消息供调试;LongTermMemory.persist 把向量记忆持久化到磁盘 |
| Modus | Agent 状态由 Runtime 自动管理:GetState序列化→WriteAgentState 落 Postgres 或内置 modusDB(modusgraph);suspend/resume 自动保存恢复,passivation 空闲钝化后可从 DB 重建 actor;agent 状态表含 id/name/status/data/updated |
| nanobot | SessionManager 每会话 JSONL 历史(原子写+fsync,自动修复);TTL 触发 AutoCompact 闲置压缩;turn 中 _emit_checkpoint 落盘 runtime checkpoint,崩溃//stop 后可恢复;记忆文件 + 可选 git 版本化(GitStore/dulwich);持续目标状态存 session metadata |
| Open Multi-Agent | 运行态全在内存:TaskQueue 持任务生命周期、SharedMemory 持跨 agent KV、AgentPool 每 run 临时(无跨 run 状态);唯一可序列化产物是 PlanArtifact(纯 JSON,createPlanArtifact→runFromPlan 重放同一 DAG)。无内置 durable checkpoint(README 明确说明) |
| OpenClaw | 会话 transcript 持久化为 JSONL(harness/session/jsonl-storage.ts,另有 memory-storage 内存实现);cron 作业/状态/run 历史持久化进 共享 SQLite state DB(旧 jobs.json 经 doctor —fix 迁移);会话/绑定/记忆文件落在 state dir(~/.openclaw/);session binding service 维护渠道↔会话映射 |
| Pilot Protocol | 协议级状态原子落盘到 ~/.pilot/:config.json、Ed25519 identity(—identity 跨重启稳定身份)、trust.json(互信记录,仅 IdentityPath 非空时加载/落盘)、beacon 缓存;registry 侧热备复制 + WAL(README.md:189)。注意坑:直接跑 daemon 而非 pilotctl daemon start 时若没自动加载 ~/.pilot/config.json,IdentityPath 为空会静默丢失 trust 持久化(cmd/daemon/main.go:96-111 已修) |
| Pipecat | 运行态在 LLMContext(消息)+ worker 内部状态;EndFrame/StopFrame 为 uninterruptible(打断也不丢);序列化主要面向 wire 传输:FrameSerializer.serialize/deserialize(Twilio/Plivo/Vonage/Telnyx/Exotel/Genesys/protobuf)把 frame 转电话/WebSocket 协议;跨 worker 状态走 bus 的 BusMessage |
| PraisonAI | Session(session.py:24) 管短期会话状态(save_state);db=db(database_url=…) 接 PostgreSQL/MySQL/SQLite/MongoDB/Redis 等 20+ 后端,自动持久化 messages/runs/traces;CLI auto_save=“proj” + Shadow Git Checkpoints(失败自动回滚) + snapshot/ |
| Semantic Kernel | 会话状态在 AgentThread(如 ChatHistoryAgentThread,含 OnSuspendAsync/OnResumeAsync 生命周期);旧式 AgentChat 用 AgentChatSerializer 序列化/恢复整个多 agent 对话;ChatCompletionAgent.RestoreChannelAsync 从 JSON 恢复 channel;Process 框架有 KernelProcessStateMetadata 检查点 |
| smolagents | 运行态=agent.state 字典(additional_args 注入沙箱变量);reset=False 可跨 run 续接记忆;序列化经 to_dict/from_dict/save/from_hub/push_to_hub 把 agent+tools+prompt 落盘/上 Hub;AGENT_REGISTRY 限制反序列化类防 RCE |
| Strands Agents | agent.state=JSON 可序列化 KV(agent/state.py);SessionManager ABC 经 hooks 自动落盘 messages/state/conversation_manager_state,含 FileSessionManager/S3SessionManager/RepositorySessionManager;take_snapshot/load_snapshot 内存快照;checkpointing 在 cycle 边界暂停可恢复 |
| SwarmClaw | better-sqlite3 本地库,每集合一张 (id,data) 表,load-modify-save + 批量删除守卫(saveCollection);session_messages 独立表(瘦身 transcript);storage-normalization 加载时迁移旧记录补默认值;LangGraph checkpoint 持久化;main-loop / delegation / queue / run-ledger 各自 repository;模块级状态用 hmrSingleton 抗 Next.js HMR |
| Swarms | autosave 把 to_dict() 状态序列化落盘(agent.py:3456 后台线程);Conversation.save_as_json/export(conversation.py:812,895);v12 MEMORY.md 跨进程持久(按 agent_name keyed);对话默认 in-memory,无 DB 后端 |
| Upsonic | 多后端 storage 统一接口:In-Memory / JSON / SQLite / Redis / PostgreSQL / MongoDB / mem0(src/upsonic/storage/),承载 session/memory/user-profile;db= 参数可整体接管(agent.py:234);Task 级 cache(vector_search/llm_call,tasks.py:49) |
| vectara-agentic | Agent 可整体序列化:dumps/loads、to_dict/from_dict(agent.py:1103)经 serialize_agent_to_dict(serialization.py:252)落盘配置+工具+memory,并用 cloudpickle 处理自定义函数工具。session_id(默认 topic:date,agent.py:169)+ Memory 提供会话维度状态;带 fallback agent 配置切换(agent.py:480) |
| VoltAgent | Memory 经 StorageAdapter 持久化消息/会话/working memory;memory-persist-queue 异步落盘;Workflow 有 WorkflowStateStore/checkpoint(suspend 后可 restart);observability 的 LocalStorage 持久化 trace;resumable-streams 支持断线续流 |
各框架实现对比 · 源码级
Aeon yaml 全部状态以文件提交进 Git main 分支;runner 末尾 git commit + pull --rebase + push 带 5 次重试与冲突自动消解;并发 workflow 靠 concurrency.group 串行化 tick、消息走唯一组并行;沙箱内 .pending-notify/ 缓冲通知待 post-run 重投
全部状态以文件提交进 Git main 分支;runner 末尾 git commit + pull --rebase + push 带 5 次重试与冲突自动消解;并发 workflow 靠 concurrency.group 串行化 tick、消息走唯一组并行;沙箱内 .pending-notify/ 缓冲通知待 post-run 重投
github/workflows/aeon.yml:818github/workflows/aeon.yml:887github/workflows/aeon.yml:70 ./"$script" || echo "::notice::$(basename "$script") failed (non-fatal)"
done
- name: Commit results
if: steps.work.outputs.mode != ''
run: |
git config user.name "aeonframework"
git config user.email "aeonframework@proton.me"
rm -f ./notify .notify-sent-hashes # Remove generated notify script + dedup log before committing
CURRENT_BRANCH=$(git branch --show-current)
LABEL="${{ steps.work.outputs.label }}" AG2 python 会话状态=各 Agent 的 _oai_messages(进程内);cache/ 持久化 LLM 响应缓存(disk/redis/cosmos/in-memory),按 seed/cache_seed 复用;对话历史可 clear_history 或保留 N 条;RAG/Teachability 经向量库落盘;无内建跨进程会话恢复
会话状态=各 Agent 的 _oai_messages(进程内);cache/ 持久化 LLM 响应缓存(disk/redis/cosmos/in-memory),按 seed/cache_seed 复用;对话历史可 clear_history 或保留 N 条;RAG/Teachability 经向量库落盘;无内建跨进程会话恢复
conversable_agent.py:272
# a dictionary of conversations, default value is list
if chat_messages is None:
self._oai_messages = defaultdict(list)
else:
self._oai_messages = chat_messages
self._oai_system_message = [{"content": system_message, "role": "system"}]
self._description = description if description is not None else system_message
self._is_termination_msg = (
is_termination_msg
if is_termination_msg is not None
else (lambda x: content_str(x.get("content")) == "TERMINATE") Agency Swarm python Agency(load_threads_callback=, save_threads_callback=) 注入持久化回调:ThreadManager 初始化时 load,每次 add_message/run 结束经 PersistenceHooks.on_run_end save(扁平消息 list,含 agent/callerAgent/timestamp 元数据)。存到 DB/文件由用户实现
Agency(load_threads_callback=, save_threads_callback=) 注入持久化回调:ThreadManager 初始化时 load,每次 add_message/run 结束经 PersistenceHooks.on_run_end save(扁平消息 list,含 agent/callerAgent/timestamp 元数据)。存到 DB/文件由用户实现
hooks.py:12utils/thread.py:234agency/core.py:91
# --- Persistence Hooks ---
class PersistenceHooks(RunHooks[MasterContext]): # type: ignore[misc]
"""Custom `RunHooks` implementation for loading and saving `ThreadManager` state.
This class integrates with the `agents.Runner` lifecycle to automatically
save message history at the end of a run using user-provided callback
functions. Loading relies on the `ThreadManager` initialization, which
invokes the same callbacks to seed the in-memory store.
Note:
The signatures for `load_threads_callback` and `save_threads_callback` now Agent-LLM (AGiXT) python 全部状态入 SQL:Agent/Conversation/Message/Chain/ChainStep/ChainStepResponse/Memory/TaskItem 等 SQLAlchemy 模型(DB.py);SQLite 或 Postgres 二选一;定时/重复任务由 Task+TaskMonitor 持久化调度(scheduled/due_date/cron 式重复)
全部状态入 SQL:Agent/Conversation/Message/Chain/ChainStep/ChainStepResponse/Memory/TaskItem 等 SQLAlchemy 模型(DB.py);SQLite 或 Postgres 二选一;定时/重复任务由 Task+TaskMonitor 持久化调度(scheduled/due_date/cron 式重复)
DB.py:1594DB.py:1939DB.py:2169DB.py:2725 timestamp = Column(DateTime, server_default=func.now())
class Agent(Base):
__tablename__ = "agent"
id = Column(
UUID(as_uuid=True) if DATABASE_TYPE != "sqlite" else String,
primary_key=True,
default=get_new_id if DATABASE_TYPE == "sqlite" else uuid.uuid4,
)
name = Column(Text, nullable=False)
provider_id = Column(
UUID(as_uuid=True) if DATABASE_TYPE != "sqlite" else String, AgentDock typescript 会话隔离 SessionManager<T>(泛型,按 sessionId 存取,TTL);orchestration 状态(activeStep/sequenceIndex/recentlyUsedTools/tokenUsage)经 OrchestrationStateManager 持久化;Storage Abstraction:统一 StorageProvider 接口 + 大量 KV/向量 adapter(Memory/Redis/Vercel KV/SQLite/Postgres/Mongo/DynamoDB/S3/Pinecone/Qdrant/Chroma…)+ 迁移工具
会话隔离 SessionManager<T>(泛型,按 sessionId 存取,TTL);orchestration 状态(activeStep/sequenceIndex/recentlyUsedTools/tokenUsage)经 OrchestrationStateManager 持久化;Storage Abstraction:统一 StorageProvider 接口 + 大量 KV/向量 adapter(Memory/Redis/Vercel KV/SQLite/Postgres/Mongo/DynamoDB/S3/Pinecone/Qdrant/Chroma…)+ 迁移工具
session/index.ts:33 * Session manager that provides isolation between concurrent sessions
* Generic over the type of state stored for each session
*/
export class SessionManager<T extends SessionState> {
/** Storage provider for persistence */
private storage: StorageProvider;
/** Default state generator function */
private defaultStateGenerator: (sessionId: SessionId) => T;
/** Namespace for storage keys */
private storageNamespace: string; AgentField go 控制平面统一持久层:local=SQLite+BoltDB / cloud=PostgreSQL(goose 迁移);执行记录、workflow execution、记忆四作用域、配置存储(POST /api/v1/configs/:key)、payload store 均落库;身份与 VC 链持久化可离线验证
控制平面统一持久层:local=SQLite+BoltDB / cloud=PostgreSQL(goose 迁移);执行记录、workflow execution、记忆四作用域、配置存储(POST /api/v1/configs/:key)、payload store 均落库;身份与 VC 链持久化可离线验证
查看 AgentField 完整笔记 →Agentic Context Engine (ACE) python Skillbook 序列化为 JSON(v2 schema)+ embedding sidecar(.embeddings.npz);save_to_file/load_from_file;CheckpointStep 按间隔存档、PersistStep 每样本写目标文件(如项目 CLAUDE.md);SimilarityDecision(KEEP)持久化去重决策
Skillbook 序列化为 JSON(v2 schema)+ embedding sidecar(.embeddings.npz);save_to_file/load_from_file;CheckpointStep 按间隔存档、PersistStep 每样本写目标文件(如项目 CLAUDE.md);SimilarityDecision(KEEP)持久化去重决策
ace/core/skillbook.py:663ace/steps/persist.py:12 raise ValueError("Skillbook serialization must be a JSON object.")
return cls.from_dict(payload)
def save_to_file(self, path: str, exclude_embeddings: bool = False) -> None:
file_path = Path(path)
sidecar_path = _embedding_sidecar_path(file_path)
file_path.parent.mkdir(parents=True, exist_ok=True)
with file_path.open("w", encoding="utf-8") as f:
f.write(self.dumps(exclude_embeddings=True))
if exclude_embeddings:
return AgentScope python 全部运行态收敛进单个 pydantic AgentState(session_id/context/summary/reply_id/cur_iter/permission_context/tool_context/tasks_context),可整体序列化恢复;服务侧 app/storage/ 提供 RedisStorage + SessionRecord/AgentRecord/UserRecord 等做多会话持久化;文件读缓存带 mtime 失效与 LRU 淘汰
全部运行态收敛进单个 pydantic AgentState(session_id/context/summary/reply_id/cur_iter/permission_context/tool_context/tasks_context),可整体序列化恢复;服务侧 app/storage/ 提供 RedisStorage + SessionRecord/AgentRecord/UserRecord 等做多会话持久化;文件读缓存带 mtime 失效与 LRU 淘汰
state/_state.py:140state/_state.py:23 """The task context."""
class AgentState(BaseModel):
"""The agent state that should be saved and loaded from storage."""
session_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
"""The session id of the agent. Normally, each session will maintain one
independent agent state for each agent."""
summary: str | list[TextBlock | DataBlock] = ""
"""The compressed summary of the context, which will be prepended to the
context when feed into the LLM.""" Agentset typescript Prisma + Postgres 为权威状态(org/namespace/document/ingest-job/webhook 等 schema,packages/db/prisma/schema/,含 40+ 迁移);向量数据在 Pinecone/Turbopuffer;批次/限流用 Redis;文件用 S3 兼容存储
Prisma + Postgres 为权威状态(org/namespace/document/ingest-job/webhook 等 schema,packages/db/prisma/schema/,含 40+ 迁移);向量数据在 Pinecone/Turbopuffer;批次/限流用 Redis;文件用 S3 兼容存储
process-document.ts:130engine/src/partition/index.ts:60 throw e;
}
},
run: async ({ documentId, ingestJob, cleanup: shouldCleanup }) => {
const db = getDb();
// Update document status to processing and get document configuration
const document = await db.document.update({
where: { id: documentId },
data: {
status: DocumentStatus.PROCESSING,
processingAt: new Date(),
}, AgentVerse python 运行态全在内存:环境的 cnt_turn/last_messages/rule_params 与各 agent 的 memory;reset() 清空重来。落盘仅限结果——task-solving save_result() 写 ./results/<task>.txt(plan/result/spend,tasksolving.py:84)、日志写 logs/。无会话恢复/检查点机制
运行态全在内存:环境的 cnt_turn/last_messages/rule_params 与各 agent 的 memory;reset() 清空重来。落盘仅限结果——task-solving save_result() 写 ./results/<task>.txt(plan/result/spend,tasksolving.py:84)、日志写 logs/。无会话恢复/检查点机制
tasksolving.py:84environments/base.py:36agents/base.py:25 def reset(self):
self.environment.reset()
def save_result(self, plan: str, result: str, spend: float):
"""Save the result to the result file"""
result_file_path = "./results/" + self.task + ".txt"
os.makedirs(os.path.dirname(result_file_path), exist_ok=True)
with open(result_file_path, "w") as f:
f.write("[Final Plan]\n" + plan + "\n\n")
f.write("[Result]\n" + result)
f.write(f"[Spent]\n${spend}") Ailoy rust 会话状态不持久化(messages 由调用方自管);持久化的是模型工件:cache 模块用 manifest + 文件系统缓存把从 S3 下载的权重/rt./tokenizer 落盘并支持 checksum 校验、download/remove(src/cache/mod.rs, src/model/local/local_language_model.rs:95,113);WASM 侧用 OPFS(FileSystem API) 缓存(Cargo.toml:114 web-sys FileSystem)
会话状态不持久化(messages 由调用方自管);持久化的是模型工件:cache 模块用 manifest + 文件系统缓存把从 S3 下载的权重/rt./tokenizer 落盘并支持 checksum 校验、download/remove(src/cache/mod.rs, src/model/local/local_language_model.rs:95,113);WASM 侧用 OPFS(FileSystem API) 缓存(Cargo.toml:114 web-sys FileSystem)
src/model/local/local_language_model.rs:64}
impl LocalLangModel {
pub async fn try_new(
model: impl Into<String>,
config: Option<LocalLangModelConfig>,
) -> anyhow::Result<Self> {
let mut strm = Self::try_new_stream(model, config);
while let Some(v) = strm.next().await {
if let Some(result) = v?.result {
return Ok(result);
}
} Astron Agent python 运行态:workflow VariablePool + WorkflowEngineCtx(节点状态/链路);DAG 引擎用 pickle 序列化做跨节点传递;持久化:MySQL(结构化)、Redis(缓存/会话/EventRegistry 注册表)、MinIO(文件)、memory 服务(会话 DB)、workflow 用 alembic 管理 schema 版本
运行态:workflow VariablePool + WorkflowEngineCtx(节点状态/链路);DAG 引擎用 pickle 序列化做跨节点传递;持久化:MySQL(结构化)、Redis(缓存/会话/EventRegistry 注册表)、MinIO(文件)、memory 服务(会话 DB)、workflow 用 alembic 管理 schema 版本
dsl_engine.py:49from workflow.infra.providers.llm.iflytek_spark.schemas import StreamOutputMsg
class WorkflowEngineCtx(BaseModel):
"""
Workflow engine execution context.
Contains all necessary state and configuration for workflow execution,
including variable pool, node status, dependencies, and execution chains.
"""
# Variable pool for storing and passing variables during execution
variable_pool: VariablePool AutoGen python 全链路 save_state()/load_state() → Mapping:agent、ChatCompletionContext、group chat manager(各自 ManagerState) 与 Team(TeamState 聚合各 agent state)均可序列化;CancellationToken 控制中断
全链路 save_state()/load_state() → Mapping:agent、ChatCompletionContext、group chat manager(各自 ManagerState) 与 Team(TeamState 聚合各 agent state)均可序列化;CancellationToken 控制中断
autogen-core/src/autogen_core/model_context/_chat_completion_context.py:66_group_chat/_base_group_chat.py:748_swarm_group_chat.py:100 """Clear the context."""
self._messages = []
async def save_state(self) -> Mapping[str, Any]:
return ChatCompletionContextState(messages=self._messages).model_dump()
async def load_state(self, state: Mapping[str, Any]) -> None:
self._messages = ChatCompletionContextState.model_validate(state).messages
class ChatCompletionContextState(BaseModel):
messages: List[LLMMessage] = Field(default_factory=list) Botpress typescript Snapshot 暂停/恢复:工具内 throw SnapshotSignal 即序列化当前执行状态→Snapshot.toJSON() 存库→后续 execute({snapshot}) 从断点续跑(适合长流程/人工审批);跨迭代 variables 持久
Snapshot 暂停/恢复:工具内 throw SnapshotSignal 即序列化当前执行状态→Snapshot.toJSON() 存库→后续 execute({snapshot}) 从断点续跑(适合长流程/人工审批);跨迭代 variables 持久
packages/llmz/src/snapshots.ts:110snapshots.ts:154packages/llmz/src/errors.ts:112packages/llmz/src/llmz.ts:384 *
* @see {@link https://github.com/botpress/botpress/blob/master/packages/llmz/examples/14_worker_snapshot/index.ts} Example usage
*/
export class Snapshot implements Serializable<Snapshot.JSON> {
public readonly id: string
public readonly reason?: string
public readonly stack: string
public readonly toolCall?: ToolCall
public variables: Variable[]
#status: SnapshotStatus
/**
* Gets the current status of the snapshot. ConnectOnion python 本地 current_session(runtime-only) + .co/ 落盘(logs/evals/uploads);input(session=...) 可恢复无状态会话;host() 经 session/storage.py 做服务端会话持久化与合并
本地 current_session(runtime-only) + .co/ 落盘(logs/evals/uploads);input(session=...) 可恢复无状态会话;host() 经 session/storage.py 做服务端会话持久化与合并
core/agent.py:247 if self.logger.console:
self.logger.console.print_task(prompt)
# Session restoration: if session passed, restore it (stateless API continuation)
if session is not None:
self.current_session = {
'session_id': session.get('session_id'),
'messages': list(session.get('messages', [])),
'trace': list(session.get('trace', [])),
'turn': session.get('turn', 0)
}
# Start YAML session logging with session_id for thread safety
self.logger.start_session(self.system_prompt, session_id=session.get('session_id')) Cordum go Redis 存工作流状态、job 元数据、指针负载;job 生命周期状态机(Succeeded/Approval/Denied… 见 engine.go setJobState);审批存储 Redis(core/edge/approval_store_redis.go);安全裁决落 jobStore(engine.go:2213 SetSafetyDecision,带 JobHash 防过期请求重放);审计哈希链 head 指针 CAS 持久于 Redis
Redis 存工作流状态、job 元数据、指针负载;job 生命周期状态机(Succeeded/Approval/Denied… 见 engine.go setJobState);审批存储 Redis(core/edge/approval_store_redis.go);安全裁决落 jobStore(engine.go:2213 SetSafetyDecision,带 JobHash 防过期请求重放);审计哈希链 head 指针 CAS 持久于 Redis
DESIGN.md:31scheduler/engine.go:2210core/audit/chain.go:245- **Workflow Engine**: Emits job steps as `JobRequest` and advances runs.
- **Workers**: Subscribe to job subjects, read context pointers, execute, emit
`JobResult` with result/artifact pointers, and send heartbeats.
- **Redis**: Stores workflow state, job metadata, and pointer payloads.
- **NATS**: Transport for CAP v2 packets (with optional JetStream durability).
## 3) CAP v2 Wire Contracts (selected fields)
CAP v2 is the canonical protocol; Cordum does not duplicate these definitions.
### 3.1 BusPacket (Envelope)
`BusPacket` carries all bus traffic: Cortex Memory rust 核心。混合持久化:cortex:// 虚拟文件系统(markdown 真相源,filesystem/operations.rs,filesystem/uri.rs:170) + Qdrant 向量索引(vector_store/qdrant.rs);VectorSyncManager 维护二者一致;MemoryIndex 做版本/元数据追踪
核心。混合持久化:cortex:// 虚拟文件系统(markdown 真相源,filesystem/operations.rs,filesystem/uri.rs:170) + Qdrant 向量索引(vector_store/qdrant.rs);VectorSyncManager 维护二者一致;MemoryIndex 做版本/元数据追踪
filesystem/uri.rs:170 /// assert_eq!(uri.dimension, cortex_mem_core::Dimension::Session);
/// assert_eq!(uri.category, "abc123");
/// ```
pub fn parse(uri: &str) -> Result<CortexUri> {
// 1. Validate scheme
if !uri.starts_with("cortex://") {
return Err(Error::InvalidScheme);
}
// 2. Split path and query
let uri_without_scheme = &uri[9..]; // Skip "cortex://"
let (path_part, query_part) = uri_without_scheme
.split_once('?') CrewAI python Flow 结构化 state(Pydantic BaseModel) + @persist/persistence 默认 SQLite 落盘,支持断点续跑;Crew 侧 CheckpointConfig+apply_checkpoint 做 task 级 checkpoint 恢复
Flow 结构化 state(Pydantic BaseModel) + @persist/persistence 默认 SQLite 落盘,支持断点续跑;Crew 侧 CheckpointConfig+apply_checkpoint 做 task 级 checkpoint 恢复
crewai/flow/runtime.py:244crewai/flow/persistence/decorators.py:163crewai/state/checkpoint_config.py:159 return value
class FlowState(BaseModel):
"""Base model for all flow states, ensuring each state has a unique ID."""
id: str = Field(
default_factory=lambda: str(uuid4()),
description="Unique identifier for the flow state",
)
T = TypeVar("T", bound=dict[str, Any] | BaseModel) Dust typescript 全量 Postgres(Sequelize ORM + Resource 抽象层)持久化对话/agent/action;Temporal 持久化工作流状态(可断点续跑);core 侧 Run/Block 结果存于 stores(Postgres),文档/向量存 Qdrant,分析存 Elasticsearch;Redis 做流式事件
全量 Postgres(Sequelize ORM + Resource 抽象层)持久化对话/agent/action;Temporal 持久化工作流状态(可断点续跑);core 侧 Run/Block 结果存于 stores(Postgres),文档/向量存 Qdrant,分析存 Elasticsearch;Redis 做流式事件
查看 Dust 完整笔记 →E2B typescript 沙箱文件系统即状态;持久化靠 pause/resume + snapshot:pause() 暂停沙箱以便后续 Sandbox.connect(id) 自动恢复;createSnapshot() 把当前文件系统+状态固化为快照,Sandbox.create(snapshotId) 从快照派生新沙箱(快照在沙箱删除后仍存活)
沙箱文件系统即状态;持久化靠 pause/resume + snapshot:pause() 暂停沙箱以便后续 Sandbox.connect(id) 自动恢复;createSnapshot() 把当前文件系统+状态固化为快照,Sandbox.create(snapshotId) 从快照派生新沙箱(快照在沙箱删除后仍存活)
js-sdk/src/sandbox/index.ts:531sandboxApi.ts:886 * await sandbox.pause()
* ```
*/
async pause(opts?: ConnectionOpts): Promise<boolean> {
return await SandboxApi.pause(this.sandboxId, this.resolveApiOpts(opts))
}
/**
* @deprecated Use {@link Sandbox.pause} instead.
*/
async betaPause(opts?: ConnectionOpts): Promise<boolean> {
return await SandboxApi.betaPause(this.sandboxId, this.resolveApiOpts(opts))
} Haystack python ①结构持久化:Pipeline.to_dict/from_dict + dumps/loads(YAML) 整图存取;②运行态持久化=Breakpoint/Snapshot:在 component/chat_generator/tool_invoker 处设断点,触发即把 inputs+component_visits+state 存成 PipelineSnapshot/AgentSnapshot(JSON),可从快照 resume;Agent State 序列化 schema
①结构持久化:Pipeline.to_dict/from_dict + dumps/loads(YAML) 整图存取;②运行态持久化=Breakpoint/Snapshot:在 component/chat_generator/tool_invoker 处设断点,触发即把 inputs+component_visits+state 存成 PipelineSnapshot/AgentSnapshot(JSON),可从快照 resume;Agent State 序列化 schema
core/pipeline/base.py:150dataclasses/breakpoints.py:13pipeline.py:340
return res
def to_dict(self) -> dict[str, Any]:
"""
Serializes the pipeline to a dictionary.
This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
:returns:
Dictionary with serialized data.
"""
components = {} hcom rust 全部状态在单个 SQLite(WAL 模式,db/mod.rs:106),路径 ~/.hcom/hcom.db(可经 HCOM_DIR 按项目隔离)。schema 版本化+迁移(db/mod.rs:39, :41)。events append-only 同时是 relay 复制源。session/process binding 表把 OS 进程/会话映射到稳定 agent 身份;reset 会归档替换 DB 文件,长连接经 inode 检测重连(db/mod.rs:123)
全部状态在单个 SQLite(WAL 模式,db/mod.rs:106),路径 ~/.hcom/hcom.db(可经 HCOM_DIR 按项目隔离)。schema 版本化+迁移(db/mod.rs:39, :41)。events append-only 同时是 relay 复制源。session/process binding 表把 OS 进程/会话映射到稳定 agent 身份;reset 会归档替换 DB 文件,长连接经 inode 检测重连(db/mod.rs:123)
db/mod.rs:39pub use instances::InstanceStatus;
/// Schema version - bump on any schema change.
const SCHEMA_VERSION: i32 = 17;
pub const DEV_ROOT_KV_KEY: &str = "config:dev_root";
const MIGRATIONS: &[(i32, &str)] = &[(
17,
"ALTER TABLE instances ADD COLUMN terminal_preset_requested TEXT DEFAULT '';
ALTER TABLE instances ADD COLUMN terminal_preset_effective TEXT DEFAULT '';
UPDATE instances
SET terminal_preset_effective = json_extract(launch_context, '$.terminal_preset')
WHERE launch_context != '' AND json_valid(launch_context) AND json_extract(launch_context, '$.terminal_preset') IS NOT NULL;",
)]; Hermes Agent python hermes_state.py = SQLite 会话库(消息 + FTS5/trigram 全文索引 + checkpoint),跨会话/跨平台连续;MEMORY.md/USER.md 文件落盘;profiles 多实例隔离配置/会话/skill/记忆;tools/checkpoint_manager.py 文件快照可回滚
hermes_state.py = SQLite 会话库(消息 + FTS5/trigram 全文索引 + checkpoint),跨会话/跨平台连续;MEMORY.md/USER.md 文件落盘;profiles 多实例隔离配置/会话/skill/记忆;tools/checkpoint_manager.py 文件快照可回滚
hermes_state.py:321tools/memory_tool.py:1"""
FTS_SQL = """
CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5(
content
);
CREATE TRIGGER IF NOT EXISTS messages_fts_insert AFTER INSERT ON messages BEGIN
INSERT INTO messages_fts(rowid, content) VALUES (
new.id,
COALESCE(new.content, '') || ' ' || COALESCE(new.tool_name, '') || ' ' || COALESCE(new.tool_calls, '')
);
END; Hive python Checkpoint-based 崩溃恢复:CheckpointStore + CheckpointConfig,execute(session_state=...) 可从 paused_at / resume_from_checkpoint 恢复;session_store/conversation_store 写穿落盘;~/.hive/ 存加密 credentials
Checkpoint-based 崩溃恢复:CheckpointStore + CheckpointConfig,execute(session_state=...) 可从 paused_at / resume_from_checkpoint 恢复;session_store/conversation_store 写穿落盘;~/.hive/ 存加密 credentials
orchestrator/orchestrator.py:620 )
# Determine entry point (may differ if resuming)
# Check if resuming from checkpoint
if session_state and session_state.get("resume_from_checkpoint") and checkpoint_store:
checkpoint_id = session_state["resume_from_checkpoint"]
try:
checkpoint = await checkpoint_store.load_checkpoint(checkpoint_id)
if checkpoint:
self.logger.info(f"🔄 Resuming from checkpoint: {checkpoint_id} (node: {checkpoint.current_node})")
checkpoint_run_id = checkpoint.run_id or LEGACY_RUN_ID
self._run_id = checkpoint_run_id Lagent python state_dict()/load_state_dict() 仿 PyTorch 递归导出/载入各(子)agent 的 memory,键带 __model_spec__ 以重建 AgentMessage 子类;HTTP server 经 /memory/{session_id} 暴露会话状态。落盘格式由调用方决定(无内建 DB)
state_dict()/load_state_dict() 仿 PyTorch 递归导出/载入各(子)agent 的 memory,键带 __model_spec__ 以重建 AgentMessage 子类;HTTP server 经 /memory/{session_id} 暴露会话状态。落盘格式由调用方决定(无内建 DB)
agents/agent.py:121agents/agent.py:135memory/base_memory.py:60 super().__setattr__('_agents', _agents)
super().__setattr__(__name, __value)
def state_dict(self, session_id=None, prefix='', destination=None) -> Dict:
if destination is None:
destination = {}
if self.memory is not None:
if session_id not in self.memory.memory_map:
warnings.warn(f'No session id {session_id} in {prefix}memory')
memory = self.memory.get(session_id)
saved_memory = memory and memory.save() or []
destination.update({prefix + 'memory': saved_memory})
for name, agent in getattr(self, '_agents', {}).items(): LangChain python 状态=TypedDict AgentState(+middleware 合并出的 schema,factory.py:1043);checkpointer(线程内会话) + store(跨线程) 由 LangGraph 提供并透传 compile();jump_to 为 EphemeralValue 不持久化
状态=TypedDict AgentState(+middleware 合并出的 schema,factory.py:1043);checkpointer(线程内会话) + store(跨线程) 由 LangGraph 提供并透传 compile();jump_to 为 EphemeralValue 不持久化
agents/factory.py:1037factory.py:1671middleware/types.py:357 ]
awrap_model_call_handler = _chain_async_model_call_handlers(async_handlers)
base_state = state_schema if state_schema is not None else AgentState
# Build an ordered list: middleware schemas first (in registration order),
# base_state last so it wins any field conflict. This lets the caller's
# explicit state_schema override middleware annotations — e.g. passing
# a DeltaChannel-annotated schema wins over BinaryOperatorAggregate from
# AgentState without requiring a post-compilation patch.
state_schemas: list[type] = [*(m.state_schema for m in middleware), base_state]
resolved_state_schema, input_schema, output_schema = _resolve_schemas(state_schemas) Llama Agentic System (llama-stack-apps) python 服务端会话持久化(enable_session_persistence=True);agent_id/session_id 由 server 分配并复用;向量库 register 后持久;客户端侧仅缓存 chat_history/context(内存字典)
服务端会话持久化(enable_session_persistence=True);agent_id/session_id 由 server 分配并复用;向量库 register 后持久;客户端侧仅缓存 chat_history/context(内存字典)
examples/agent_store/api.py:131examples/agents/simple_chat.py:53examples/agent_store/app.py:18 instructions="",
sampling_params={"strategy": {"type": "greedy"}},
toolgroups=toolgroups,
enable_session_persistence=True,
)
elif agent_type == AgentChoice.Memory:
vector_db_ids = agent_params.get("vector_db_ids", [])
toolgroups = [
{
"name": "builtin::rag",
"args": {
"vector_db_ids": vector_db_ids,
"query_config": QueryConfig( LlamaIndex python 运行态存 Workflow Context.store(memory/state/scratchpad/num_iterations 等 KV);initial_state 深拷贝入 store;RAG 侧 StorageContext.persist() 落盘 docstore/index_store/vector_store,load_index_from_storage 恢复;对话历史经 SQLAlchemyChatStore(默认 sqlite 内存,可换持久 DB)
运行态存 Workflow Context.store(memory/state/scratchpad/num_iterations 等 KV);initial_state 深拷贝入 store;RAG 侧 StorageContext.persist() 落盘 docstore/index_store/vector_store,load_index_from_storage 恢复;对话历史经 SQLAlchemyChatStore(默认 sqlite 内存,可换持久 DB)
agent/workflow/base_agent.py:284storage/chat_store/sql.py:31
return self._ensure_tools_are_async(cast(List[BaseTool], tools))
async def _init_context(self, ctx: Context, ev: AgentWorkflowStartEvent) -> None:
"""Initialize the context once, if needed."""
if not await ctx.store.get("memory", default=None):
default_memory = ev.get("memory", default=None)
default_memory = default_memory or ChatMemoryBuffer.from_defaults(
llm=self.llm or Settings.llm
)
await ctx.store.set("memory", default_memory)
if not await ctx.store.get("state", default=None):
await ctx.store.set("state", copy.deepcopy(self.initial_state)) LoongFlow python ① Checkpoint:按 checkpoint-iter-{iter}-{count} 目录定期落盘进化数据库(solutions/.json + metadata.json + best_solution.json),可从 checkpoint 恢复 completion_count 与种群(pes_agent.py:348,in_memory.py:298,377);② 进化记忆后端可选 in-memory 或 Redis(MemoryFactory);③ Workspace 把每轮 planner/executor/summarizer/evaluator 产物按 {task_id}/{iter}/ 结构化落盘
① Checkpoint:按 checkpoint-iter-{iter}-{count} 目录定期落盘进化数据库(solutions/.json + metadata.json + best_solution.json),可从 checkpoint 恢复 completion_count 与种群(pes_agent.py:348,in_memory.py:298,377);② 进化记忆后端可选 in-memory 或 Redis(MemoryFactory);③ Workspace 把每轮 planner/executor/summarizer/evaluator 产物按 {task_id}/{iter}/ 结构化落盘
framework/pes/pes_agent.py:348agentsdk/memory/evolution/in_memory.py:298framework/pes/context/workspace.py:39 if should_save:
await self._save_checkpoint(iteration_id, current_count)
async def _save_checkpoint(self, iteration_id: int, completion_count: int) -> None:
"""
Saves a checkpoint with the specific naming convention.
Args:
iteration_id: The iteration ID of the task that just finished.
completion_count: The total number of completed tasks so far.
"""
# Requirement 1: Directory format: checkpoint-iter-{iteration_id}-{completion_count}
dir_name = f"checkpoint-iter-{iteration_id}-{completion_count}" Maestro python 运行态状态仅存内存列表,进程结束即丢;唯一持久化是结束时把完整交换日志写成 {timestamp}_{objective}.md + 生成的代码工程落盘
运行态状态仅存内存列表,进程结束即丢;唯一持久化是结束时把完整交换日志写成 {timestamp}_{objective}.md + 生成的代码工程落盘
maestro.py:301maestro.py:194
console.print(f"\n[bold]Refined Final output:[/bold]\n{refined_output}")
with open(filename, 'w') as file:
file.write(exchange_log)
print(f"\nFull exchange log saved to {filename}") Mastra typescript 可插拔 storage(MastraStorage base + composite store + filesystem/in-memory/外部 DB 适配器),按 domain 分库(agents/skills/workspaces/mcp-clients/scorer-definitions…)持久化线程、消息、memory、workflow snapshot;workflow 快照支持 resumeStream();request-context/di 管运行时上下文
可插拔 storage(MastraStorage base + composite store + filesystem/in-memory/外部 DB 适配器),按 domain 分库(agents/skills/workspaces/mcp-clients/scorer-definitions…)持久化线程、消息、memory、workflow snapshot;workflow 快照支持 resumeStream();request-context/di 管运行时上下文
workflows/workflow.ts:66 SuspendOptions,
} from './step';
import { forwardAgentStreamChunk } from './stream-utils';
import type {
DefaultEngineType,
DynamicMapping,
ExtractSchemaFromStep,
ExtractSchemaType,
PathsToStringProps,
SerializedStep,
SerializedStepFlowEntry,
StepFlowEntry,
StepResult, MetaGPT python SerializationMixin + Team.serialize/deserialize 把整个团队(含 context/角色/记忆)存成 team.json 支持断点恢复(recover_path);Environment.history(Memory)留存全量消息供调试;LongTermMemory.persist 把向量记忆持久化到磁盘
SerializationMixin + Team.serialize/deserialize 把整个团队(含 context/角色/记忆)存成 team.json 支持断点恢复(recover_path);Environment.history(Memory)留存全量消息供调试;LongTermMemory.persist 把向量记忆持久化到磁盘
metagpt/team.py:59team.py:67metagpt/environment/base_env.py:134metagpt/memory/longterm_memory.py:69 if "env_desc" in data:
self.env.desc = data["env_desc"]
def serialize(self, stg_path: Path = None):
stg_path = SERDESER_PATH.joinpath("team") if stg_path is None else stg_path
team_info_path = stg_path.joinpath("team.json")
serialized_data = self.model_dump()
serialized_data["context"] = self.env.context.serialize()
write_json_file(team_info_path, serialized_data)
@classmethod
def deserialize(cls, stg_path: Path, context: Context = None) -> "Team": Modus go Agent 状态由 Runtime 自动管理:GetState序列化→WriteAgentState 落 Postgres 或内置 modusDB(modusgraph);suspend/resume 自动保存恢复,passivation 空闲钝化后可从 DB 重建 actor;agent 状态表含 id/name/status/data/updated
Agent 状态由 Runtime 自动管理:GetState序列化→WriteAgentState 落 Postgres 或内置 modusDB(modusgraph);suspend/resume 自动保存恢复,passivation 空闲钝化后可从 DB 重建 actor;agent 状态表含 id/name/status/data/updated
runtime/db/agentstate.go:24runtime/actors/agents.go:208sdk/go/pkg/agents/agents.go:232
var ErrAgentNotFound = errors.New("agent not found")
type AgentState struct {
Gid uint64 `json:"gid,omitempty"`
Id string `json:"id" db:"constraint=unique"`
Name string `json:"name"`
Status string `json:"status"`
Data string `json:"data,omitempty"`
UpdatedAt string `json:"updated"`
}
func WriteAgentState(ctx context.Context, state AgentState) error { nanobot python SessionManager 每会话 JSONL 历史(原子写+fsync,自动修复);TTL 触发 AutoCompact 闲置压缩;turn 中 _emit_checkpoint 落盘 runtime checkpoint,崩溃//stop 后可恢复;记忆文件 + 可选 git 版本化(GitStore/dulwich);持续目标状态存 session metadata
SessionManager 每会话 JSONL 历史(原子写+fsync,自动修复);TTL 触发 AutoCompact 闲置压缩;turn 中 _emit_checkpoint 落盘 runtime checkpoint,崩溃//stop 后可恢复;记忆文件 + 可选 git 版本化(GitStore/dulwich);持续目标状态存 session metadata
agent/loop.py:707agent/memory.py:370 if not ephemeral and self._extra_hooks:
hook = CompositeHook([loop_hook] + self._extra_hooks)
async def _checkpoint(payload: dict[str, Any]) -> None:
if session is None:
return
self._set_runtime_checkpoint(session, payload)
async def _drain_pending(*, limit: int = _MAX_INJECTIONS_PER_TURN) -> list[dict[str, Any]]:
"""Drain follow-up messages from the pending queue.
When no messages are immediately available but sub-agents
spawned in this dispatch are still running, blocks until at Open Multi-Agent typescript 运行态全在内存:TaskQueue 持任务生命周期、SharedMemory 持跨 agent KV、AgentPool 每 run 临时(无跨 run 状态);唯一可序列化产物是 PlanArtifact(纯 JSON,createPlanArtifact→runFromPlan 重放同一 DAG)。无内置 durable checkpoint(README 明确说明)
运行态全在内存:TaskQueue 持任务生命周期、SharedMemory 持跨 agent KV、AgentPool 每 run 临时(无跨 run 状态);唯一可序列化产物是 PlanArtifact(纯 JSON,createPlanArtifact→runFromPlan 重放同一 DAG)。无内置 durable checkpoint(README 明确说明)
src/orchestrator/orchestrator.ts:1413src/orchestrator/orchestrator.ts:1448src/task/queue.ts:55 * version that records task descriptions. Executed run results are rejected
* because their task records are not a replay contract.
*/
createPlanArtifact(result: TeamRunResult): PlanArtifact {
if (result.planOnly !== true || !result.tasks) {
throw new Error('createPlanArtifact requires a plan-only TeamRunResult.')
}
return {
version: 1,
...(result.goal !== undefined ? { goal: result.goal } : {}),
tasks: result.tasks.map((task): PlanTaskArtifact => {
if (!task.description) { OpenClaw typescript 会话 transcript 持久化为 JSONL(harness/session/jsonl-storage.ts,另有 memory-storage 内存实现);cron 作业/状态/run 历史持久化进 共享 SQLite state DB(旧 jobs.json 经 doctor --fix 迁移);会话/绑定/记忆文件落在 state dir(~/.openclaw/);session binding service 维护渠道↔会话映射
会话 transcript 持久化为 JSONL(harness/session/jsonl-storage.ts,另有 memory-storage 内存实现);cron 作业/状态/run 历史持久化进 共享 SQLite state DB(旧 jobs.json 经 doctor --fix 迁移);会话/绑定/记忆文件落在 state dir(~/.openclaw/);session binding service 维护渠道↔会话映射
docs/automation/cron-jobs.md:43## How cron works
- Cron runs **inside the Gateway** process (not inside the model).
- Job definitions, runtime state, and run history persist in OpenClaw's shared SQLite state database so restarts do not lose schedules.
- On upgrade, run `openclaw doctor --fix` to import legacy `~/.openclaw/cron/jobs.json`, `jobs-state.json`, and `runs/*.jsonl` files into SQLite and rename them with a `.migrated` suffix. Malformed job rows are skipped from runtime and copied to `jobs-quarantine.json` for later repair or review.
- `cron.store` still names the logical cron store key and doctor import path. After import, editing that JSON file no longer changes active cron jobs; use `openclaw cron add|edit|remove` or the Gateway cron RPC methods instead.
- All cron executions create [background task](/automation/tasks) records.
- On Gateway startup, overdue isolated agent-turn jobs are rescheduled out of the channel-connect window instead of replaying immediately, so Discord/Telegram startup and native-command setup stay responsive after restarts.
- One-shot jobs (`--at`) auto-delete after success by default.
- Isolated cron runs best-effort close tracked browser tabs/processes for their `cron:<jobId>` session when the run completes, so detached browser automation does not leave orphaned processes behind.
- Isolated cron runs that receive the narrow cron self-cleanup grant can still read scheduler status, a self-filtered list of their current job, and that job's run history, so status/heartbea
… Pilot Protocol go 协议级状态原子落盘到 ~/.pilot/:config.json、Ed25519 identity(--identity 跨重启稳定身份)、trust.json(互信记录,仅 IdentityPath 非空时加载/落盘)、beacon 缓存;registry 侧热备复制 + WAL(README.md:189)。注意坑:直接跑 daemon 而非 pilotctl daemon start 时若没自动加载 ~/.pilot/config.json,IdentityPath 为空会静默丢失 trust 持久化(cmd/daemon/main.go:96-111 已修)
协议级状态原子落盘到 ~/.pilot/:config.json、Ed25519 identity(--identity 跨重启稳定身份)、trust.json(互信记录,仅 IdentityPath 非空时加载/落盘)、beacon 缓存;registry 侧热备复制 + WAL(README.md:189)。注意坑:直接跑 daemon 而非 pilotctl daemon start 时若没自动加载 ~/.pilot/config.json,IdentityPath 为空会静默丢失 trust 持久化(cmd/daemon/main.go:96-111 已修)
cmd/daemon/main.go:62pkg/daemon/daemon.go:1761README.md:189 registryTLS := flag.Bool("registry-tls", false, "use TLS for registry connection")
registryFingerprint := flag.String("registry-fingerprint", "", "hex SHA-256 fingerprint of registry TLS certificate (required when -registry-trust=pinned)")
registryTrust := flag.String("registry-trust", "pinned", "trust store for -registry-tls: 'pinned' (verify cert against -registry-fingerprint) or 'system' (OS x509 root store — used for compat-mode registry on registry.pilotprotocol.network:443 with Let's Encrypt)")
identityPath := flag.String("identity", "", "path to persist Ed25519 identity (enables stable identity across restarts)")
email := flag.String("email", "", "email address for account identification and key recovery")
owner := flag.String("owner", "", "(deprecated: use -email) owner identifier for key rotation recovery")
keepalive := flag.Duration("keepalive", 0, "keepalive probe interval (default 30s)")
idleTimeout := flag.Duration("idle-timeout", 0, "idle connection timeout (default 120s)")
synRate := flag.Int("syn-rate-limit", 0, "max SYN packets per second (default 100)")
maxConnsPerPort := flag.Int("max-conns-per-port", 0, "max connections per port (default 1024)")
maxConnsTotal := flag.Int("max-conns-total", 0, "max total connections (default 4096)")
timeWait := flag.Duration("time-wait", 0, "TIME_WAIT duration (default 10s)")
public := flag.Bool("public", false, "make this node's endpoint publicly visible (default: private)") Pipecat python 运行态在 LLMContext(消息)+ worker 内部状态;EndFrame/StopFrame 为 uninterruptible(打断也不丢);序列化主要面向 wire 传输:FrameSerializer.serialize/deserialize(Twilio/Plivo/Vonage/Telnyx/Exotel/Genesys/protobuf)把 frame 转电话/WebSocket 协议;跨 worker 状态走 bus 的 BusMessage
运行态在 LLMContext(消息)+ worker 内部状态;EndFrame/StopFrame 为 uninterruptible(打断也不丢);序列化主要面向 wire 传输:FrameSerializer.serialize/deserialize(Twilio/Plivo/Vonage/Telnyx/Exotel/Genesys/protobuf)把 frame 转电话/WebSocket 协议;跨 worker 状态走 bus 的 BusMessage
serializers/base_serializer.py:23from pipecat.utils.base_object import BaseObject
class FrameSerializer(BaseObject):
"""Abstract base class for frame serialization implementations.
Defines the interface for converting frames to/from serialized formats
for transmission or storage. Subclasses must implement the core
serialize/deserialize methods.
"""
class InputParams(BaseModel):
"""Base configuration parameters for FrameSerializer. PraisonAI python Session(session.py:24) 管短期会话状态(save_state);db=db(database_url=...) 接 PostgreSQL/MySQL/SQLite/MongoDB/Redis 等 20+ 后端,自动持久化 messages/runs/traces;CLI auto_save="proj" + Shadow Git Checkpoints(失败自动回滚) + snapshot/
Session(session.py:24) 管短期会话状态(save_state);db=db(database_url=...) 接 PostgreSQL/MySQL/SQLite/MongoDB/Redis 等 20+ 后端,自动持久化 messages/runs/traces;CLI auto_save="proj" + Shadow Git Checkpoints(失败自动回滚) + snapshot/
session.py:24from .agent import Agent
class Session:
"""
A simple wrapper around PraisonAI's existing stateful capabilities.
Provides a unified API for:
- Session management with persistent state
- Memory operations (short-term, long-term, user-specific)
- Knowledge base operations
- Agent state management
- Remote agent connectivity Semantic Kernel csharp 会话状态在 AgentThread(如 ChatHistoryAgentThread,含 OnSuspendAsync/OnResumeAsync 生命周期);旧式 AgentChat 用 AgentChatSerializer 序列化/恢复整个多 agent 对话;ChatCompletionAgent.RestoreChannelAsync 从 JSON 恢复 channel;Process 框架有 KernelProcessStateMetadata 检查点
会话状态在 AgentThread(如 ChatHistoryAgentThread,含 OnSuspendAsync/OnResumeAsync 生命周期);旧式 AgentChat 用 AgentChatSerializer 序列化/恢复整个多 agent 对话;ChatCompletionAgent.RestoreChannelAsync 从 JSON 恢复 channel;Process 框架有 KernelProcessStateMetadata 检查点
dotnet/src/Agents/Abstractions/AgentThread.cs:18dotnet/src/Agents/Abstractions/AgentChatSerializer.cs:16dotnet/src/Agents/Core/ChatCompletionAgent.cs:253/// This class is used to manage the lifecycle of an agent thread.
/// The thread can be not-start, started or ended.
/// </remarks>
public abstract class AgentThread
{
/// <summary>
/// Gets the id of the current thread.
/// </summary>
public virtual string? Id { get; protected set; }
/// <summary>
/// Gets a value indicating whether the thread has been deleted.
/// </summary> smolagents python 运行态=agent.state 字典(additional_args 注入沙箱变量);reset=False 可跨 run 续接记忆;序列化经 to_dict/from_dict/save/from_hub/push_to_hub 把 agent+tools+prompt 落盘/上 Hub;AGENT_REGISTRY 限制反序列化类防 RCE
运行态=agent.state 字典(additional_args 注入沙箱变量);reset=False 可跨 run 续接记忆;序列化经 to_dict/from_dict/save/from_hub/push_to_hub 把 agent+tools+prompt 落盘/上 Hub;AGENT_REGISTRY 限制反序列化类防 RCE
agents.py:331agents.py:892agents.py:970agents.py:1810Strands Agents python agent.state=JSON 可序列化 KV(agent/state.py);SessionManager ABC 经 hooks 自动落盘 messages/state/conversation_manager_state,含 FileSessionManager/S3SessionManager/RepositorySessionManager;take_snapshot/load_snapshot 内存快照;checkpointing 在 cycle 边界暂停可恢复
agent.state=JSON 可序列化 KV(agent/state.py);SessionManager ABC 经 hooks 自动落盘 messages/state/conversation_manager_state,含 FileSessionManager/S3SessionManager/RepositorySessionManager;take_snapshot/load_snapshot 内存快照;checkpointing 在 cycle 边界暂停可恢复
session/session_manager.py:31agent.py:1238logger = logging.getLogger(__name__)
class SessionManager(HookProvider, ABC):
"""Abstract interface for managing sessions.
A session manager is in charge of persisting the conversation and state of an agent across its interaction.
Changes made to the agents conversation, state, or other attributes should be persisted immediately after
they are changed. The different methods introduced in this class are called at important lifecycle events
for an agent, and should be persisted in the session.
"""
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None: SwarmClaw typescript better-sqlite3 本地库,每集合一张 (id,data) 表,load-modify-save + 批量删除守卫(saveCollection);session_messages 独立表(瘦身 transcript);storage-normalization 加载时迁移旧记录补默认值;LangGraph checkpoint 持久化;main-loop / delegation / queue / run-ledger 各自 repository;模块级状态用 hmrSingleton 抗 Next.js HMR
better-sqlite3 本地库,每集合一张 (id,data) 表,load-modify-save + 批量删除守卫(saveCollection);session_messages 独立表(瘦身 transcript);storage-normalization 加载时迁移旧记录补默认值;LangGraph checkpoint 持久化;main-loop / delegation / queue / run-ledger 各自 repository;模块级状态用 hmrSingleton 抗 Next.js HMR
storage.ts:90
// --- SQLite Database ---
const DB_PATH = IS_BUILD_BOOTSTRAP ? ':memory:' : path.join(DATA_DIR, 'swarmclaw.db')
const db = new Database(DB_PATH)
if (!IS_BUILD_BOOTSTRAP) {
db.pragma('journal_mode = WAL')
db.pragma('busy_timeout = 15000')
db.pragma('synchronous = NORMAL')
db.pragma('cache_size = -64000')
db.pragma('mmap_size = 268435456')
}
db.pragma('foreign_keys = ON') Swarms python autosave 把 to_dict() 状态序列化落盘(agent.py:3456 后台线程);Conversation.save_as_json/export(conversation.py:812,895);v12 MEMORY.md 跨进程持久(按 agent_name keyed);对话默认 in-memory,无 DB 后端
autosave 把 to_dict() 状态序列化落盘(agent.py:3456 后台线程);Conversation.save_as_json/export(conversation.py:812,895);v12 MEMORY.md 跨进程持久(按 agent_name keyed);对话默认 in-memory,无 DB 后端
agent.py:3456structs/conversation.py:812conversation.py:420 """
def autosave_loop():
while self.autosave:
try:
self.save()
if self.verbose:
logger.debug(
f"Autosaved agent state (interval: {interval}s)"
)
except Exception as e:
logger.error(f"Autosave failed: {e}")
time.sleep(interval) Upsonic python 多后端 storage 统一接口:In-Memory / JSON / SQLite / Redis / PostgreSQL / MongoDB / mem0(src/upsonic/storage/),承载 session/memory/user-profile;db= 参数可整体接管(agent.py:234);Task 级 cache(vector_search/llm_call,tasks.py:49)
多后端 storage 统一接口:In-Memory / JSON / SQLite / Redis / PostgreSQL / MongoDB / mem0(src/upsonic/storage/),承载 session/memory/user-profile;db= 参数可整体接管(agent.py:234);Task 级 cache(vector_search/llm_call,tasks.py:49)
pipeline/steps.py:564tasks.py:49 self._finalize_step_result(step_result, context)
class StorageConnectionStep(Step):
"""Setup storage connection for memory and database operations."""
@property
def name(self) -> str:
return "storage_connection"
@property
def description(self) -> str:
return "Setup storage connection" vectara-agentic python Agent 可整体序列化:dumps/loads、to_dict/from_dict(agent.py:1103)经 serialize_agent_to_dict(serialization.py:252)落盘配置+工具+memory,并用 cloudpickle 处理自定义函数工具。session_id(默认 topic:date,agent.py:169)+ Memory 提供会话维度状态;带 fallback agent 配置切换(agent.py:480)
Agent 可整体序列化:dumps/loads、to_dict/from_dict(agent.py:1103)经 serialize_agent_to_dict(serialization.py:252)落盘配置+工具+memory,并用 cloudpickle 处理自定义函数工具。session_id(默认 topic:date,agent.py:169)+ Memory 提供会话维度状态;带 fallback agent 配置切换(agent.py:480)
agent.py:1103agent_core/serialization.py:252serialization.py:285 #
# Serialization methods
#
def dumps(self) -> str:
"""Serialize the Agent instance to a JSON string."""
return json.dumps(self.to_dict())
@classmethod
def loads(
cls,
data: str,
agent_progress_callback: Optional[
Callable[[AgentStatusType, dict, str], None] VoltAgent typescript Memory 经 StorageAdapter 持久化消息/会话/working memory;memory-persist-queue 异步落盘;Workflow 有 WorkflowStateStore/checkpoint(suspend 后可 restart);observability 的 LocalStorage 持久化 trace;resumable-streams 支持断线续流
Memory 经 StorageAdapter 持久化消息/会话/working memory;memory-persist-queue 异步落盘;Workflow 有 WorkflowStateStore/checkpoint(suspend 后可 restart);observability 的 LocalStorage 持久化 trace;resumable-streams 支持断线续流
查看 VoltAgent 完整笔记 →