多智能体编排
一句话总结多个各有角色的 agent 如何协作:角色定义、拓扑(主从/网状/流水线/群聊)、通信(handoff/消息/共享状态)。把一个超级 agent 拆成一个团队。概念详见 single-vs-multi-agent。
它解决什么问题
让职责分离(创意 vs 审查)、并行、可组合,代价是协调开销与误差传播。是众多框架(AutoGen、CrewAI、Swarm、MetaGPT)的核心卖点。
设计维度 / 实现谱系
- 拓扑:supervisor 层级 ↔ swarm 网状 ↔ sequential 流水线 ↔ group chat
- 通信:handoff(移交控制权)↔ 消息传递 ↔ 共享状态/黑板 ↔ 跨进程网络(hcom、A2A)
- 角色定义:role+goal+backstory(CrewAI)↔ SOP 标准流程(MetaGPT)↔ 函数式 handoff(Swarm)
- 调度:谁决定下一个发言/行动的 agent
- 终止:何时认为团队任务完成
关键要点
- 三要素:角色、拓扑、通信。
- handoff(Swarm)是极简优雅的抽象:工具返回一个 agent 即转交。
- 先把单 agent 做好再上多 agent(design-tradeoffs)。
关联
各框架实现对比
下表汇总 41 个实现了「多智能体编排」的框架(源码级阅读结论)。网站上以可展开 + 源码节选呈现。
| 框架 | 实现方式 |
|---|---|
| Aeon | 进程内非传统多 agent;靠 chains(多 workflow step 串/并行 + 输出落 .outputs/{skill}.md 注入下游)。真正”多实例”= Instance Fleet:spawn-instance fork 出专精副本登记 memory/instances.json,fleet-control/fork-fleet 管理 |
| AG2 | 多模式:①GroupChat+GroupChatManager,speaker 选择 auto/manual/random/round_robin/可调用;②新式 Group/handoff:agent.handoffs 挂 OnContextCondition(无 LLM) / OnCondition(LLM) / after-work,配 Pattern(Auto/RoundRobin/Manual/Random),由 GroupToolExecutor 执行转移;③Swarm(initiate_swarm_chat/run_swarm);④nested / sequential / 两两 chat |
| Agency Swarm | 核心卖点。①send_message 工具(同步 RPC:sender 调工具→recipient.get_response()→把 final_output 当工具返回值回灌,tools/send_message.py:314);②有向 communication_flows(a > b 经 AgentFlow+register_subagent 动态装工具);③可选 Handoff 控制权转移工具 |
| Agent-LLM (AGiXT) | ①agent 互调:extension 命令内经 self.ApiClient.prompt_agent(…) 让一个 agent 调另一个 agent(automation_helpers.py:264);②Chain 可在步骤里指定不同 agent_name 串接多 agent;③各 BotManager(Discord/Slack/Teams…)+ WorkerRegistry/BotManagerRegistry 做渠道侧编排 |
| AgentField | 核心卖点:app.call(“node.func”) 经控制平面路由(绝不直连),自动传播 workflow/session/actor 上下文并构建 DAG;版本路由用加权轮询做 canary(5%→50%→100%),回 X-Routed-Version;AgentRouter(prefix=…) 做命名空间 |
| Agentic Context Engine (ACE) | 非”多个对等 agent 协作”,而是三个固定角色的学习流水线经 Pipeline 编排;Branch 支持并行分支+合并,RR 内部可递归派生只读 sandbox 子 agent(create_readonly_sandbox) |
| AgentScope | 2.0 移除了 1.0 的 msghub/pipeline 模块(本仓不存在)。当前路径:① Agent.observe() 注入外部消息做松散协作;② FastAPI agent service 做多租户多会话承载与编排(examples/agent_service);③ MCP/A2A 把别的 agent 当工具/服务。README 称 message hub 在 roadmap |
| AgentVerse | 框架本体即多 agent 编排。两条主线:① simulation 经 5 规则(order/visibility/selector/updater/describer)调度 N 个对话 agent 的回合制交互;② task-solving 经 4 规则把 agent 按 AGENT_TYPES(ROLE_ASSIGNMENT/SOLVER/CRITIC/EXECUTION/EVALUATION/MANAGER)组织成”招募→讨论→执行→评估”流水线,讨论拓扑可选 vertical/horizontal/central/dynamic |
| Astron Agent | 靠 workflow DAG 引擎而非 agent 间直接通信:agent 节点把一个完整 agent 嵌入工作流;flow 节点可嵌套调用其它 workflow(WorkflowPlugin 让 CoT 把别的 workflow 当工具调);节点间靠 VariablePool 传值,引擎用 asyncio.create_task 并发调度依赖就绪的节点 |
| AutoGen | 核心强项。BaseGroupChat(Team) + BaseGroupChatManager:manager 通过 pub/sub 选下一发言者(select_speaker),participant 经 ChatAgentContainer 包成 runtime agent;五种内置模式:RoundRobin / Selector(LLM 选人) / Swarm(handoff 驱动) / GraphFlow(DiGraph 显式拓扑) / Magentic-One(ledger 编排) |
| Botpress | 内核无固定多 agent 协议;用 Exit 做 handoff 实现:MultiAgentOrchestrator 把每个子 agent 暴露为 handoff_ |
| ConnectOnion | 两条路:①进程内 subagents 插件经 task() 工具派生隔离子 agent;②跨网络 host() 把 agent 暴露为 HTTP+WebSocket,经 relay 做 P2P 发现,connect()/RemoteAgent 远程调用 |
| Cordum | 这是核心:Scheduler 按 Heartbeat(算力/能力/pool/labels)做容量感知路由到 worker 池或直连 worker,靠内存 TTL 注册表(无持久 DB);策略可做 pool segmentation(敏感数据只进可信池) |
| CrewAI | Process.sequential(按序执行 task) / Process.hierarchical(自动建经理 agent 持 AgentTools 委派);agent 间经 Delegate/AskQuestion 工具协作;A2A 协议跨进程 |
| Dust | run_agent MCP server 实现”agent as tool”:两种模式 run-agent(子 agent 在后台子对话执行并回传结果)与 handoff(子 agent 直接接管对话);可向子 agent 转发文件/toolset |
| hcom | 核心。①消息:hcom send @name(s) [—intent request/inform/ack] [—reply-to] [—thread],按 @mention 定向或广播,写成 events 行(commands/send.rs:1, messages.rs:13 scope 计算)。②投递:Claude 经 PostToolUse hook turn 中途注入、SessionStart/Stop 注入(hooks/claude.rs:916 handle_posttooluse, :421 handle_sessionstart);PTY 模式经 TCP inject 端口投递(delivery.rs:1)。③spawn/fork/resume/kill:hcom [N] |
| Hermes Agent | delegate_task 工具派生隔离子 AIAgent:全新对话(无父史)、独立 task_id/终端、受限 toolset(强制剥离 delegate/clarify/memory/send_message/execute_code)、单/批并行(ThreadPoolExecutor),父进程阻塞至子完成、只回看 summary |
| Hive | 核心范式:Queen 生成 worker graph,多 worker 并行(fan-out/fan-in),colony=一组 worker 的部署;session 隔离 + 共享 buffer;run_parallel_workers 横向扩同类工作 |
| Lagent | 组合式:Sequential 容器按顺序串接 agent 并可 exit_at 提前退出;AgentList/AgentDict 把 agent 当容器元素;任意 agent 赋值为属性即成递归子 agent(_agents)。无中心调度器/角色协议 |
| LangChain | create_agent(name=…) 返回的图可作为子图节点嵌入另一张 LangGraph(factory.py:803 文档);SubagentTransformer 把嵌套命名 agent 识别为 run.subagents 句柄并转发其事件流;更高层编排走 LangGraph / Deep Agents |
| LlamaIndex | AgentWorkflow 持有多个命名 agent + root_agent;自动注入 handoff 工具(return_direct)按 can_handoff_to 白名单切换 current_agent_name,共享同一 memory/state;from_tools_or_functions 据是否 function-calling 自动选 Function/ReAct agent |
| LoongFlow | 进程内多 Worker 流水线:PESAgent 以 asyncio 并发跑多个 evolution cycle(concurrency 控并发数),每 cycle 内 Planner/Executor/Summary 三 Agent 协作;岛模型把种群分到 num_islands 并定期迁移,等价于并行进化群体;无跨网络分布式 agent 协议 |
| Maestro | 核心范式:1 个 orchestrator (强模型) + N 次 sub-agent (弱模型) 的 supervisor 编排;模型分层由 3 常量配置;orchestrator 串行派发,子代理无并发、不互相通信 |
| Mastra | 三条路:①agent.network(messages, opts) 用一个 routing agent 在 sub-agents/workflows/tools 间动态路由迭代直至完成;②sub-agent:在 agents: 注册的 SubAgent 被包成工具供主 agent 调用(getToolsForExecution);③workflow 内 createStep(agent) 把 agent 当步骤静态编排;另有 A2A 协议(@mastra/core/a2a) |
| MetaGPT | 框架核心:Team.hire(roles) 把角色放进 Environment;Environment.publish_message 按 member_addrs 地址路由到角色私有队列;Environment.run 用 asyncio.gather 并发跑所有非 idle 角色;订阅靠 cause_by ∈ rc.watch 解耦。新一代 MGXEnv + TeamLeader(Mike)做动态 @ 路由与 direct_chat |
| Modus | actor 间消息传递:SendMessage(同步阻塞带 timeout)/SendMessageAsync(timeout=0 异步);agent 可在自身方法内 Start/SendMessage 其他 agent;底层 GoAkt 支持分布式(actor 可能在别的进程/机器) |
| nanobot | 进程内 SubagentManager 派生隔离子 agent(独立 ToolRegistry/workspace scope,复用 AgentRunner),结果经 bus 作为 system 消息回灌父会话;spawn 工具触发,受 max_concurrent_subagents 限流。无跨网络 agent 协议 |
| Open Multi-Agent | 核心范式:coordinator 拆 DAG → TaskQueue 拓扑解析(完成自动 unblock、失败级联) → Scheduler 自动分配(默认 dependency-first,另有 round-robin/least-busy/capability-match) → AgentPool 信号量并发执行;可选 delegate_to_agent 工具做同步子 agent 委派(带环检测/深度上限/池槽防死锁) |
| OpenClaw | 两条路:①多 agent 路由——按渠道/账号/peer 把入站消息路由到隔离 agent(独立 workspace + per-agent session);②子 agent 派生——sessions_spawn 工具派生 subagent / ACP 外部 CLI agent,受 maxSpawnDepth、maxChildren、requireAgentId 策略约束;sessions_list/sessions_history/sessions_send 做跨会话协作 |
| Pilot Protocol | 核心,但是”网络级编排”:①寻址=48 位虚拟地址 N:NNNN.HHHH.LLLL + 16 位端口 + hostname 发现;②互信=双向签名握手(端口 444,经 registry relay),节点默认私有;③发现/NAT=经 rendezvous 注册解析、STUN、打洞、relay 兜底。data flows 点对点直连 |
| Pipecat | 两路:①进程内 ParallelPipeline 并行多条管道;②多 worker 经 WorkerBus 协作——@job(name=, sequential=) 暴露 handler,调用方 async with self.job(name) / self.job_group(names) 发请求并等 JobStatus;WorkerRegistry 跟踪本地/远程 worker(pgmq/redis 可跨进程) |
| PraisonAI | Agents 容器 + Process 三模式:sequential(按序、自动传上下文)、hierarchical(manager_llm 充当 orchestrator 动态派活)、workflow(按 next_tasks/condition 走图,支持 route/parallel/loop/repeat);另有 Handoff 做 agent→agent 转交(仿 OpenAI Agents SDK)、A2A 协议 |
| Semantic Kernel | 两代并存:① 旧 AgentGroupChat/AgentChat(轮转+终止策略);② 新 AgentOrchestration<TIn,TOut> 基类下的 Concurrent / Sequential / GroupChat / Handoff / Magentic 五种模式,跑在 actor Runtime 上;GroupChat 由 GroupChatManager(SelectNextAgent/ShouldTerminate/ShouldRequestUserInput) 编排;agent 亦可作为另一 agent 的 plugin |
| smolagents | 层级式:把子 agent 放进 managed_agents,框架给它套上 name/description/inputs 使其”像工具一样可被调用”;父 agent 经 agent(task=…)(call) 调用,子 agent 跑完整 run 返回报告。注意:远程沙箱执行器不支持 managed agents |
| Strands Agents | 三模式:①Graph 确定性依赖图(支持环/嵌套,GraphBuilder 声明边);②Swarm 自治协作团队(工具化 handoff + 共享上下文);③A2A 协议(server/executor);另 agent.as_tool() 把 agent 当工具 |
| Swarm | 网状 handoff:工具返回 Agent/Result(agent=…) 即移交控制权 |
| SwarmClaw | ①进程内 subagent:spawnSubagent 派生隔离子 session,带 delegationDepth 限制(DEFAULT_DELEGATION_MAX_DEPTH);②外部委派:delegate 工具 spawn claude/codex/opencode/gemini/copilot/droid/cursor/qwen CLI 子进程,回退链 + resume id;③跨 OpenClaw gateway 路由;org-chart/team 可视化编排 |
| Swarms | 核心卖点:60+ 拓扑文件,经 SwarmRouter 按 17 种 SwarmType 统一分派;含 Sequential/Concurrent(ThreadPool)/AgentRearrange(flow DSL)/Graph(DAG 拓扑排序)/MixtureOfAgents/Hierarchical/GroupChat/MajorityVoting/Council/Debate/Heavy/RoundRobin 等;另有 handoffs 让 agent 间移交 |
| Transformers Agents | 后期支持 managed agents(一个 agent 调用另一个),整体仍偏单 agent |
| Upsonic | Team 三种模式 sequential/coordinate(leader 协调)/route(router 分派),ask_other_team_members 自动互为工具;Graph 做带 State 的 DAG/链工作流(可 parallel_execution);agent 亦可作为工具被另一 agent 调用 |
| VoltAgent | Supervisor/Sub-agent:SubAgentManager 经 delegate_task 工具把任务 handoffTask 给子 agent,handoffToMultiple 并行委派多个;另有 A2A server 协议跨进程协作 |
各框架实现对比 · 源码级
Aeon yaml 进程内非传统多 agent;靠 chains(多 workflow step 串/并行 + 输出落 .outputs/{skill}.md 注入下游)。真正"多实例"= Instance Fleet:spawn-instance fork 出专精副本登记 memory/instances.json,fleet-control/fork-fleet 管理
进程内非传统多 agent;靠 chains(多 workflow step 串/并行 + 输出落 .outputs/{skill}.md 注入下游)。真正"多实例"= Instance Fleet:spawn-instance fork 出专精副本登记 memory/instances.json,fleet-control/fork-fleet 管理
查看 Aeon 完整笔记 →AG2 python 多模式:①GroupChat+GroupChatManager,speaker 选择 auto/manual/random/round_robin/可调用;②新式 Group/handoff:agent.handoffs 挂 OnContextCondition(无 LLM) / OnCondition(LLM) / after-work,配 Pattern(Auto/RoundRobin/Manual/Random),由 GroupToolExecutor 执行转移;③Swarm(initiate_swarm_chat/run_swarm);④nested / sequential / 两两 chat
多模式:①GroupChat+GroupChatManager,speaker 选择 auto/manual/random/round_robin/可调用;②新式 Group/handoff:agent.handoffs 挂 OnContextCondition(无 LLM) / OnCondition(LLM) / after-work,配 Pattern(Auto/RoundRobin/Manual/Random),由 GroupToolExecutor 执行转移;③Swarm(initiate_swarm_chat/run_swarm);④nested / sequential / 两两 chat
agentchat/groupchat.py:147agentchat/group/handoffs.py:16group/patterns/auto.py:19group/multi_agent_chat.py:44 max_round: int = 10
admin_name: str = "Admin"
func_call_filter: bool = True
speaker_selection_method: Literal["auto", "manual", "random", "round_robin"] | Callable[..., Any] = "auto"
max_retries_for_selecting_speaker: int = 2
allow_repeat_speaker: bool | list[Agent] | None = None
allowed_or_disallowed_speaker_transitions: dict[str, Any] | None = None
speaker_transitions_type: Literal["allowed", "disallowed", None] = None
enable_clear_history: bool = False
send_introductions: bool = False
select_speaker_message_template: str = """You are in a role play game. The following roles are available:
{roles}.
Read the following conversation. Agency Swarm python 核心卖点。①send_message 工具(同步 RPC:sender 调工具→recipient.get_response()→把 final_output 当工具返回值回灌,tools/send_message.py:314);②有向 communication_flows(a > b 经 AgentFlow+register_subagent 动态装工具);③可选 Handoff 控制权转移工具
核心卖点。①send_message 工具(同步 RPC:sender 调工具→recipient.get_response()→把 final_output 当工具返回值回灌,tools/send_message.py:314);②有向 communication_flows(a > b 经 AgentFlow+register_subagent 动态装工具);③可选 Handoff 控制权转移工具
tools/send_message.py:38agency/setup.py:178agent/subagents.py:34agent/agent_flow.py:14logger = logging.getLogger(__name__)
class SendMessage(FunctionTool):
"""
Use this tool to facilitate direct, synchronous communication between specialized agents within your agency.
When you send a message using this tool, you receive a response exclusively from the designated recipient
agent. To continue the dialogue, invoke this tool again with the desired recipient agent and your follow-up
message. Remember, communication here is synchronous; the recipient agent won't perform any tasks post-response.
You are responsible for relaying the recipient agent's responses back to the user, as the user does not have
direct access to these replies. Keep engaging with the tool for continuous interaction until the task is fully Agent-LLM (AGiXT) python ①agent 互调:extension 命令内经 self.ApiClient.prompt_agent(...) 让一个 agent 调另一个 agent(automation_helpers.py:264);②Chain 可在步骤里指定不同 agent_name 串接多 agent;③各 BotManager(Discord/Slack/Teams…)+ WorkerRegistry/BotManagerRegistry 做渠道侧编排
①agent 互调:extension 命令内经 self.ApiClient.prompt_agent(...) 让一个 agent 调另一个 agent(automation_helpers.py:264);②Chain 可在步骤里指定不同 agent_name 串接多 agent;③各 BotManager(Discord/Slack/Teams…)+ WorkerRegistry/BotManagerRegistry 做渠道侧编排
automation_helpers.py:264XT.py:1004 fields = output_model.model_fields
field_descriptions = [f"{field}: {fields[field]}" for field in fields]
schema = "\n".join(field_descriptions)
response = self.ApiClient.prompt_agent(
agent_id=self.agent_id,
prompt_name="Convert to JSON",
prompt_args={
"user_input": input_string,
"schema": schema,
"conversation_name": "AGiXT Terminal",
},
)
response = str(response).split("```json")[1].split("```")[0].strip() AgentField go 核心卖点:app.call("node.func") 经控制平面路由(绝不直连),自动传播 workflow/session/actor 上下文并构建 DAG;版本路由用加权轮询做 canary(5%→50%→100%),回 X-Routed-Version;AgentRouter(prefix=...) 做命名空间
核心卖点:app.call("node.func") 经控制平面路由(绝不直连),自动传播 workflow/session/actor 上下文并构建 DAG;版本路由用加权轮询做 canary(5%→50%→100%),回 X-Routed-Version;AgentRouter(prefix=...) 做命名空间
agent.py:3710control-plane/internal/handlers/execute.go:1567execute.go:310 **kwargs,
)
async def call(self, target: str, *args, **kwargs) -> dict:
"""
Initiates a cross-agent call to another reasoner or skill via the AgentField execution gateway.
This method allows agents to seamlessly communicate and utilize reasoners/skills
deployed on other agent nodes within the AgentField ecosystem. It properly propagates
workflow tracking headers and maintains execution context for DAG building.
**Return Type**: Always returns JSON/dict objects, similar to calling any REST API.
No automatic schema conversion is performed - developers can convert to Pydantic Agentic Context Engine (ACE) python 非"多个对等 agent 协作",而是三个固定角色的学习流水线经 Pipeline 编排;Branch 支持并行分支+合并,RR 内部可递归派生只读 sandbox 子 agent(create_readonly_sandbox)
非"多个对等 agent 协作",而是三个固定角色的学习流水线经 Pipeline 编排;Branch 支持并行分支+合并,RR 内部可递归派生只读 sandbox 子 agent(create_readonly_sandbox)
ace/runners/ace.py:80ace/core/sandbox.py:749 """
skillbook = skillbook or Skillbook()
steps: list[StepProtocol[ACEStepContext]] = [
AgentStep(agent, skillbook),
EvaluateStep(environment),
*learning_tail(
reflector,
skill_manager,
skillbook,
dedup_manager=dedup_manager,
dedup_interval=dedup_interval,
checkpoint_dir=checkpoint_dir,
checkpoint_interval=checkpoint_interval, AgentScope python 2.0 移除了 1.0 的 msghub/pipeline 模块(本仓不存在)。当前路径:① Agent.observe() 注入外部消息做松散协作;② FastAPI agent service 做多租户多会话承载与编排(examples/agent_service);③ MCP/A2A 把别的 agent 当工具/服务。README 称 message hub 在 roadmap
2.0 移除了 1.0 的 msghub/pipeline 模块(本仓不存在)。当前路径:① Agent.observe() 注入外部消息做松散协作;② FastAPI agent service 做多租户多会话承载与编排(examples/agent_service);③ MCP/A2A 把别的 agent 当工具/服务。README 称 message hub 在 roadmap
agent/_agent.py:254mcp/_mcp_client.py:22 finally:
pass
async def observe(self, msgs: Msg | list[Msg] | None = None) -> None:
"""Receive external observation message(s) and save them into
context."""
await self._handle_incoming_messages(msgs)
async def compress_context(
self,
context_config: ContextConfig | None = None,
) -> None:
"""Compress the agent's context if the token count exceeds the AgentVerse python 框架本体即多 agent 编排。两条主线:① simulation 经 5 规则(order/visibility/selector/updater/describer)调度 N 个对话 agent 的回合制交互;② task-solving 经 4 规则把 agent 按 AGENT_TYPES(ROLE_ASSIGNMENT/SOLVER/CRITIC/EXECUTION/EVALUATION/MANAGER)组织成"招募→讨论→执行→评估"流水线,讨论拓扑可选 vertical/horizontal/central/dynamic
框架本体即多 agent 编排。两条主线:① simulation 经 5 规则(order/visibility/selector/updater/describer)调度 N 个对话 agent 的回合制交互;② task-solving 经 4 规则把 agent 按 AGENT_TYPES(ROLE_ASSIGNMENT/SOLVER/CRITIC/EXECUTION/EVALUATION/MANAGER)组织成"招募→讨论→执行→评估"流水线,讨论拓扑可选 vertical/horizontal/central/dynamic
environments/simulation_env/rules/base.py:32environments/tasksolving_env/rules/base.py:71
# class Rule(BaseModel):
class SimulationRule(BaseRule):
"""
Rule for the environment. It controls the speaking order of the agents
and maintain the set of visible agents for each agent.
"""
order: BaseOrder
visibility: BaseVisibility
selector: BaseSelector
updater: BaseUpdater Astron Agent python 靠 workflow DAG 引擎而非 agent 间直接通信:agent 节点把一个完整 agent 嵌入工作流;flow 节点可嵌套调用其它 workflow(WorkflowPlugin 让 CoT 把别的 workflow 当工具调);节点间靠 VariablePool 传值,引擎用 asyncio.create_task 并发调度依赖就绪的节点
靠 workflow DAG 引擎而非 agent 间直接通信:agent 节点把一个完整 agent 嵌入工作流;flow 节点可嵌套调用其它 workflow(WorkflowPlugin 让 CoT 把别的 workflow 当工具调);节点间靠 VariablePool 传值,引擎用 asyncio.create_task 并发调度依赖就绪的节点
dsl_engine.py:786 return DefaultNodeExecutionStrategy()
class WorkflowEngine(BaseModel):
"""
Main workflow execution engine.
Orchestrates the execution of workflow nodes using depth-first search,
manages error handling, retry mechanisms, and provides various execution
strategies for different node types.
"""
engine_ctx: WorkflowEngineCtx = None # type: ignore AutoGen python 核心强项。BaseGroupChat(Team) + BaseGroupChatManager:manager 通过 pub/sub 选下一发言者(select_speaker),participant 经 ChatAgentContainer 包成 runtime agent;五种内置模式:RoundRobin / Selector(LLM 选人) / Swarm(handoff 驱动) / GraphFlow(DiGraph 显式拓扑) / Magentic-One(ledger 编排)
核心强项。BaseGroupChat(Team) + BaseGroupChatManager:manager 通过 pub/sub 选下一发言者(select_speaker),participant 经 ChatAgentContainer 包成 runtime agent;五种内置模式:RoundRobin / Selector(LLM 选人) / Swarm(handoff 驱动) / GraphFlow(DiGraph 显式拓扑) / Magentic-One(ledger 编排)
_group_chat/_base_group_chat_manager.py:25_round_robin_group_chat.py:72_selector_group_chat.py:152_swarm_group_chat.py:82from ._sequential_routed_agent import SequentialRoutedAgent
class BaseGroupChatManager(SequentialRoutedAgent, ABC):
"""Base class for a group chat manager that manages a group chat with multiple participants.
It is the responsibility of the caller to ensure:
- All participants must subscribe to the group chat topic and each of their own topics.
- The group chat manager must subscribe to the group chat topic.
- The agent types of the participants must be unique.
- For each participant, the agent type must be the same as the topic type.
Without the above conditions, the group chat will not function correctly. Botpress typescript 内核无固定多 agent 协议;用 Exit 做 handoff 实现:MultiAgentOrchestrator 把每个子 agent 暴露为 handoff_<name> Exit,onExit 钩子里切换 currentAgent 并防环路;可扩展到上百子 agent
内核无固定多 agent 协议;用 Exit 做 handoff 实现:MultiAgentOrchestrator 把每个子 agent 暴露为 handoff_<name> Exit,onExit 钩子里切换 currentAgent 并防环路;可扩展到上百子 agent
packages/llmz/examples/08_chat_multi_agent/orchestrator.ts:19orchestrator.ts:72orchestrator.ts:110 * This system can scale to hundreds of sub-agents, each with dozens or even hundreds of tools.
* It is designed to handle complex inquiries by allowing agents to hand off requests to other agents.
*/
export class MultiAgentOrchestrator {
private _agents: SubAgent[]
private _currentAgentName: string
private _previousHandoffs: Array<{
from: string
to: string
reason: string
}> = [] ConnectOnion python 两条路:①进程内 subagents 插件经 task() 工具派生隔离子 agent;②跨网络 host() 把 agent 暴露为 HTTP+WebSocket,经 relay 做 P2P 发现,connect()/RemoteAgent 远程调用
两条路:①进程内 subagents 插件经 task() 工具派生隔离子 agent;②跨网络 host() 把 agent 暴露为 HTTP+WebSocket,经 relay 做 P2P 发现,connect()/RemoteAgent 远程调用
network/host/server.py:280network/connect.py:565DEFAULT_RELAY_URL = "wss://oo.openonion.ai"
def host(
create_agent: Callable,
port: int = None,
trust: Union[str, "Agent"] = None,
result_ttl: int = None,
workers: int = None,
reload: bool = None,
*,
relay_url: str = DEFAULT_RELAY_URL,
blacklist: list | None = None, Cordum go 这是核心:Scheduler 按 Heartbeat(算力/能力/pool/labels)做容量感知路由到 worker 池或直连 worker,靠内存 TTL 注册表(无持久 DB);策略可做 pool segmentation(敏感数据只进可信池)
这是核心:Scheduler 按 Heartbeat(算力/能力/pool/labels)做容量感知路由到 worker 池或直连 worker,靠内存 TTL 注册表(无持久 DB);策略可做 pool segmentation(敏感数据只进可信池)
DESIGN.md:124This keeps bus payloads small, preserves durability, and allows audit tooling
to dereference pointers when needed (`GET /api/v1/memory?ptr=...`).
## 5) Control Plane Flow (High Level)
1. Gateway writes context JSON to Redis, sets `context_ptr`, publishes
`BusPacket{JobRequest}` to `sys.job.submit`.
2. Scheduler receives `JobRequest`, records state, calls Safety Kernel, and
applies `PolicyConstraints`.
3. Scheduler selects a pool and/or direct worker subject using heartbeats
and publishes to `job.*` or `worker.<id>.jobs`.
4. Worker fetches `context_ptr`, executes, writes result to Redis, publishes
`BusPacket{JobResult}` with `result_ptr` and `artifact_ptrs`. CrewAI python Process.sequential(按序执行 task) / Process.hierarchical(自动建经理 agent 持 AgentTools 委派);agent 间经 Delegate/AskQuestion 工具协作;A2A 协议跨进程
Process.sequential(按序执行 task) / Process.hierarchical(自动建经理 agent 持 AgentTools 委派);agent 间经 Delegate/AskQuestion 工具协作;A2A 协议跨进程
crewai/process.py:4crewai/crew.py:1464crewai/tools/agent_tools/delegate_work_tool.py:16from enum import Enum
class Process(str, Enum):
"""
Class representing the different processes that can be used to tackle tasks
"""
sequential = "sequential"
hierarchical = "hierarchical"
# TODO: consensual = 'consensual' Dust typescript run_agent MCP server 实现"agent as tool":两种模式 run-agent(子 agent 在后台子对话执行并回传结果)与 handoff(子 agent 直接接管对话);可向子 agent 转发文件/toolset
run_agent MCP server 实现"agent as tool":两种模式 run-agent(子 agent 在后台子对话执行并回传结果)与 handoff(子 agent 直接接管对话);可向子 agent 转发文件/toolset
front/lib/api/actions/servers/run_agent/metadata.ts:18// The actual tool name is dynamic: `run_<agent_name>`.
export const RUN_AGENT_PLACEHOLDER_TOOL_NAME = "run_agent" as const;
export const RUN_AGENT_CONFIGURABLE_PROPERTIES = {
executionMode: z
.object({
options: z
.union([
z
.object({
value: z.literal("run-agent"),
label: z.literal("Agent runs in the background"),
}) hcom rust 核心。①消息:hcom send @name(s) [--intent request|inform|ack] [--reply-to] [--thread],按 @mention 定向或广播,写成 events 行(commands/send.rs:1, messages.rs:13 scope 计算)。②投递:Claude 经 PostToolUse hook turn 中途注入、SessionStart/Stop 注入(hooks/claude.rs:916 handle_posttooluse, :421 handle_sessionstart);PTY 模式经 TCP inject 端口投递(delivery.rs:1)。③spawn/fork/resume/kill:hcom [N] <tool> 起真实终端或 headless(commands/launch.rs:1, launcher.rs:1),hcom f/r/kill(commands/fork.rs, resume.rs, kill.rs)。④订阅/反应:hcom events sub <filters>,订阅存 kv 的 events_sub: 行,命中可自动 on_hit_text 回消息(db/subscriptions.rs:1)。⑤碰撞检测默认开:两 agent 30s 内改同一文件双方收通知(README:101)
核心。①消息:hcom send @name(s) [--intent request|inform|ack] [--reply-to] [--thread],按 @mention 定向或广播,写成 events 行(commands/send.rs:1, messages.rs:13 scope 计算)。②投递:Claude 经 PostToolUse hook turn 中途注入、SessionStart/Stop 注入(hooks/claude.rs:916 handle_posttooluse, :421 handle_sessionstart);PTY 模式经 TCP inject 端口投递(delivery.rs:1)。③spawn/fork/resume/kill:hcom [N] <tool> 起真实终端或 headless(commands/launch.rs:1, launcher.rs:1),hcom f/r/kill(commands/fork.rs, resume.rs, kill.rs)。④订阅/反应:hcom events sub <filters>,订阅存 kv 的 events_sub: 行,命中可自动 on_hit_text 回消息(db/subscriptions.rs:1)。⑤碰撞检测默认开:两 agent 30s 内改同一文件双方收通知(README:101)
hooks/claude.rs:916}
/// Parent PostToolUse: bootstrap, messages, vanilla binding.
fn handle_posttooluse(
db: &HcomDb,
ctx: &HcomContext,
payload: &HookPayload,
instance_name: &str,
instance_data: &InstanceRow,
updates: &serde_json::Map<String, Value>,
) -> (i32, String) {
let tool_name = payload.tool_name.as_str();
let mut outputs: Vec<Value> = Vec::new(); Hermes Agent python delegate_task 工具派生隔离子 AIAgent:全新对话(无父史)、独立 task_id/终端、受限 toolset(强制剥离 delegate/clarify/memory/send_message/execute_code)、单/批并行(ThreadPoolExecutor),父进程阻塞至子完成、只回看 summary
delegate_task 工具派生隔离子 AIAgent:全新对话(无父史)、独立 task_id/终端、受限 toolset(强制剥离 delegate/clarify/memory/send_message/execute_code)、单/批并行(ThreadPoolExecutor),父进程阻塞至子完成、只回看 summary
tools/delegate_tool.py:1#!/usr/bin/env python3
"""
Delegate Tool -- Subagent Architecture
Spawns child AIAgent instances with isolated context, restricted toolsets,
and their own terminal sessions. Supports single-task and batch (parallel)
modes. The parent blocks until all children complete.
Each child gets:
- A fresh conversation (no parent history) Hive python 核心范式:Queen 生成 worker graph,多 worker 并行(fan-out/fan-in),colony=一组 worker 的部署;session 隔离 + 共享 buffer;run_parallel_workers 横向扩同类工作
核心范式:Queen 生成 worker graph,多 worker 并行(fan-out/fan-in),colony=一组 worker 的部署;session 隔离 + 共享 buffer;run_parallel_workers 横向扩同类工作
orchestrator/orchestrator.py:115orchestrator/edge.py:416 branch_timeout_seconds: float = 300.0
class Orchestrator:
"""
Executes agent graphs.
Example:
executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=tools,
tool_executor=my_tool_executor, Lagent python 组合式:Sequential 容器按顺序串接 agent 并可 exit_at 提前退出;AgentList/AgentDict 把 agent 当容器元素;任意 agent 赋值为属性即成递归子 agent(_agents)。无中心调度器/角色协议
组合式:Sequential 容器按顺序串接 agent 并可 exit_at 提前退出;AgentList/AgentDict 把 agent 当容器元素;任意 agent 赋值为属性即成递归子 agent(_agents)。无中心调度器/角色协议
agents/agent.py:409agents/agent.py:550agents/agent.py:114 pass
class Sequential(Agent):
"""Sequential is an agent container that forwards messages to each agent
in the order they are added."""
def __init__(self, *agents: Union[Agent, Iterable], **kwargs):
super().__init__(**kwargs)
self._agents = OrderedDict()
if not agents:
raise ValueError('At least one agent should be provided')
if isinstance(agents[0], Iterable) and not isinstance(agents[0], Agent): LangChain python create_agent(name=...) 返回的图可作为子图节点嵌入另一张 LangGraph(factory.py:803 文档);SubagentTransformer 把嵌套命名 agent 识别为 run.subagents 句柄并转发其事件流;更高层编排走 LangGraph / Deep Agents
create_agent(name=...) 返回的图可作为子图节点嵌入另一张 LangGraph(factory.py:803 文档);SubagentTransformer 把嵌套命名 agent 识别为 run.subagents 句柄并转发其事件流;更高层编排走 LangGraph / Deep Agents
agents/_subagent_transformer.py:1factory.py:1681factory.py:803"""Surface nested named agents as typed `run.subagents` handles.
Detects subagents via the `lc_agent_name` transition that langgraph's base
`_TasksLifecycleBase` now computes. `create_agent(name=...)` binds
`lc_agent_name` into the run config; the base transformer records, per
namespace, the `lc_agent_name` seen on each task start (first-write-wins).
A subagent boundary is a nested run whose `lc_agent_name` is set *and* differs
from its parent namespace's `lc_agent_name`. Plain subgraphs inherit the
parent's name (so they compare equal and are excluded); unnamed agents have LlamaIndex python AgentWorkflow 持有多个命名 agent + root_agent;自动注入 handoff 工具(return_direct)按 can_handoff_to 白名单切换 current_agent_name,共享同一 memory/state;from_tools_or_functions 据是否 function-calling 自动选 Function/ReAct agent
AgentWorkflow 持有多个命名 agent + root_agent;自动注入 handoff 工具(return_direct)按 can_handoff_to 白名单切换 current_agent_name,共享同一 memory/state;from_tools_or_functions 据是否 function-calling 自动选 Function/ReAct agent
agent/workflow/multi_agent_workflow.py:99 """Metaclass for AgentWorkflow that inherits from WorkflowMeta."""
class AgentWorkflow(Workflow, PromptMixin, metaclass=AgentWorkflowMeta):
"""A workflow for managing multiple agents with handoffs."""
def __init__(
self,
agents: List[BaseWorkflowAgent],
initial_state: Optional[Dict] = None,
root_agent: Optional[str] = None,
handoff_prompt: Optional[Union[str, BasePromptTemplate]] = None,
handoff_output_prompt: Optional[Union[str, BasePromptTemplate]] = None, LoongFlow python 进程内多 Worker 流水线:PESAgent 以 asyncio 并发跑多个 evolution cycle(concurrency 控并发数),每 cycle 内 Planner/Executor/Summary 三 Agent 协作;岛模型把种群分到 num_islands 并定期迁移,等价于并行进化群体;无跨网络分布式 agent 协议
进程内多 Worker 流水线:PESAgent 以 asyncio 并发跑多个 evolution cycle(concurrency 控并发数),每 cycle 内 Planner/Executor/Summary 三 Agent 协作;岛模型把种群分到 num_islands 并定期迁移,等价于并行进化群体;无跨网络分布式 agent 协议
framework/pes/pes_agent.py:377agentsdk/memory/evolution/in_memory.py:1057 exc_info=True,
)
async def _try_start_new_cycle(self) -> bool:
"""
Atomically checks conditions and starts a new evolution cycle if possible.
"""
async with self._iteration_lock:
if (
len(self._running_tasks) < self.max_workers
and self._current_iteration < self.max_iterations
and not self._stop_event.is_set()
): Maestro python 核心范式:1 个 orchestrator (强模型) + N 次 sub-agent (弱模型) 的 supervisor 编排;模型分层由 3 常量配置;orchestrator 串行派发,子代理无并发、不互相通信
核心范式:1 个 orchestrator (强模型) + N 次 sub-agent (弱模型) 的 supervisor 编排;模型分层由 3 常量配置;orchestrator 串行派发,子代理无并发、不互相通信
maestro.py:19maestro.py:42# Claude 3 Haiku claude-3-haiku-20240307
# Claude 3.5 Sonnet claude-3-5-sonnet-20240620
ORCHESTRATOR_MODEL = "claude-3-5-sonnet-20240620"
SUB_AGENT_MODEL = "claude-3-5-sonnet-20240620"
REFINER_MODEL = "claude-3-5-sonnet-20240620"
def calculate_subagent_cost(model, input_tokens, output_tokens):
# Pricing information per model
pricing = {
"claude-3-opus-20240229": {"input_cost_per_mtok": 15.00, "output_cost_per_mtok": 75.00},
"claude-3-haiku-20240307": {"input_cost_per_mtok": 0.25, "output_cost_per_mtok": 1.25},
"claude-3-sonnet-20240229": {"input_cost_per_mtok": 3.00, "output_cost_per_mtok": 15.00}, Mastra typescript 三条路:①agent.network(messages, opts) 用一个 routing agent 在 sub-agents/workflows/tools 间动态路由迭代直至完成;②sub-agent:在 agents: 注册的 SubAgent 被包成工具供主 agent 调用(getToolsForExecution);③workflow 内 createStep(agent) 把 agent 当步骤静态编排;另有 A2A 协议(@mastra/core/a2a)
三条路:①agent.network(messages, opts) 用一个 routing agent 在 sub-agents/workflows/tools 间动态路由迭代直至完成;②sub-agent:在 agents: 注册的 SubAgent 被包成工具供主 agent 调用(getToolsForExecution);③workflow 内 createStep(agent) 把 agent 当步骤静态编排;另有 A2A 协议(@mastra/core/a2a)
agent/agent.ts:6377agent/subagent.ts:43loop/network/index.ts:297workflows/workflow.ts:430 * }
* ```
*/
async network(
messages: MessageListInput,
options?: MultiPrimitiveExecutionOptions<undefined>,
): Promise<MastraAgentNetworkStream<undefined>>;
async network<OUTPUT extends {}>(
messages: MessageListInput,
options?: MultiPrimitiveExecutionOptions<OUTPUT>,
): Promise<MastraAgentNetworkStream<OUTPUT>>;
async network<OUTPUT = undefined>(messages: MessageListInput, options?: MultiPrimitiveExecutionOptions<OUTPUT>) {
const requestContextToUse = options?.requestContext || new RequestContext(); MetaGPT python 框架核心:Team.hire(roles) 把角色放进 Environment;Environment.publish_message 按 member_addrs 地址路由到角色私有队列;Environment.run 用 asyncio.gather 并发跑所有非 idle 角色;订阅靠 cause_by ∈ rc.watch 解耦。新一代 MGXEnv + TeamLeader(Mike)做动态 @ 路由与 direct_chat
框架核心:Team.hire(roles) 把角色放进 Environment;Environment.publish_message 按 member_addrs 地址路由到角色私有队列;Environment.run 用 asyncio.gather 并发跑所有非 idle 角色;订阅靠 cause_by ∈ rc.watch 解耦。新一代 MGXEnv + TeamLeader(Mike)做动态 @ 路由与 direct_chat
metagpt/team.py:32metagpt/environment/base_env.py:175base_env.py:197role.py:399)
class Team(BaseModel):
"""
Team: Possesses one or more roles (agents), SOP (Standard Operating Procedures), and a env for instant messaging,
dedicated to env any multi-agent activity, such as collaboratively writing executable code.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
env: Optional[Environment] = None
investment: float = Field(default=10.0) Modus go actor 间消息传递:SendMessage(同步阻塞带 timeout)/SendMessageAsync(timeout=0 异步);agent 可在自身方法内 Start/SendMessage 其他 agent;底层 GoAkt 支持分布式(actor 可能在别的进程/机器)
actor 间消息传递:SendMessage(同步阻塞带 timeout)/SendMessageAsync(timeout=0 异步);agent 可在自身方法内 Start/SendMessage 其他 agent;底层 GoAkt 支持分布式(actor 可能在别的进程/机器)
sdk/go/pkg/agents/agents.go:337runtime/actors/agents.go:201// The message is sent synchronously and the function will block until a response is received or the timeout is reached.
// The timeout can be set using the WithTimeout option. If there is no timeout set, it will wait indefinitely until a
// response is received, or until calling function is cancelled.
func SendMessage(agentId, msgName string, options ...messageOption) (*string, error) {
m := &message{
name: msgName,
timeout: math.MaxInt64, // default to no timeout (wait indefinitely)
}
for _, opt := range options {
opt(m)
} nanobot python 进程内 SubagentManager 派生隔离子 agent(独立 ToolRegistry/workspace scope,复用 AgentRunner),结果经 bus 作为 system 消息回灌父会话;spawn 工具触发,受 max_concurrent_subagents 限流。无跨网络 agent 协议
进程内 SubagentManager 派生隔离子 agent(独立 ToolRegistry/workspace scope,复用 AgentRunner),结果经 bus 作为 system 消息回灌父会话;spawn 工具触发,受 max_concurrent_subagents 限流。无跨网络 agent 协议
agent/subagent.py:74agent/loop.py:275 self._status.error = str(context.error)
class SubagentManager:
"""Manages background subagent execution."""
def __init__(
self,
provider: LLMProvider,
workspace: Path,
bus: MessageBus,
max_tool_result_chars: int,
model: str | None = None, Open Multi-Agent typescript 核心范式:coordinator 拆 DAG → TaskQueue 拓扑解析(完成自动 unblock、失败级联) → Scheduler 自动分配(默认 dependency-first,另有 round-robin/least-busy/capability-match) → AgentPool 信号量并发执行;可选 delegate_to_agent 工具做同步子 agent 委派(带环检测/深度上限/池槽防死锁)
核心范式:coordinator 拆 DAG → TaskQueue 拓扑解析(完成自动 unblock、失败级联) → Scheduler 自动分配(默认 dependency-first,另有 round-robin/least-busy/capability-match) → AgentPool 信号量并发执行;可选 delegate_to_agent 工具做同步子 agent 委派(带环检测/深度上限/池槽防死锁)
src/orchestrator/orchestrator.ts:1070src/task/queue.ts:55src/orchestrator/scheduler.ts:96src/tool/built-in/delegate.ts:24 * @param team - A team created via {@link createTeam} (or `new Team(...)`).
* @param goal - High-level natural-language goal for the team.
*/
async runTeam(
team: Team,
goal: string,
options?: RunTeamOptions,
): Promise<TeamRunResult> {
const agentConfigs = team.getAgents()
const coordinatorOverrides = options?.coordinator
// ------------------------------------------------------------------
// Short-circuit: skip coordinator for simple, single-action goals. OpenClaw typescript 两条路:①多 agent 路由——按渠道/账号/peer 把入站消息路由到隔离 agent(独立 workspace + per-agent session);②子 agent 派生——sessions_spawn 工具派生 subagent / ACP 外部 CLI agent,受 maxSpawnDepth、maxChildren、requireAgentId 策略约束;sessions_list/sessions_history/sessions_send 做跨会话协作
两条路:①多 agent 路由——按渠道/账号/peer 把入站消息路由到隔离 agent(独立 workspace + per-agent session);②子 agent 派生——sessions_spawn 工具派生 subagent / ACP 外部 CLI agent,受 maxSpawnDepth、maxChildren、requireAgentId 策略约束;sessions_list/sessions_history/sessions_send 做跨会话协作
src/agents/acp-spawn.ts:1src/agents/agent-tools.policy.ts:70/** Implements ACP subagent/session spawning, binding, limits, and parent-stream setup. */
import crypto from "node:crypto";
import fs from "node:fs/promises";
import {
resolveAcpSessionCwd,
resolveAcpThreadSessionDetailLines,
} from "@openclaw/acp-core/runtime/session-identifiers";
import type { AcpRuntimeSessionMode } from "@openclaw/acp-core/runtime/types";
import {
normalizeOptionalLowercaseString, Pilot Protocol go 核心,但是"网络级编排":①寻址=48 位虚拟地址 N:NNNN.HHHH.LLLL + 16 位端口 + hostname 发现;②互信=双向签名握手(端口 444,经 registry relay),节点默认私有;③发现/NAT=经 rendezvous 注册解析、STUN、打洞、relay 兜底。data flows 点对点直连
核心,但是"网络级编排":①寻址=48 位虚拟地址 N:NNNN.HHHH.LLLL + 16 位端口 + hostname 发现;②互信=双向签名握手(端口 444,经 registry relay),节点默认私有;③发现/NAT=经 rendezvous 注册解析、STUN、打洞、relay 兜底。data flows 点对点直连
README.md:163cmd/daemon/main.go:233pkg/daemon/routing/routing.go:21<tr>
<td width="50%" valign="top">
**Addressing**
- 48-bit virtual addresses (`N:NNNN.HHHH.LLLL`)
- 16-bit ports with well-known assignments
- Hostname-based discovery
**Transport**
- Reliable streams (TCP-equivalent)
- Sliding window, SACK, congestion control (AIMD)
- Flow control (advertised receive window)
- Nagle coalescing, auto segmentation, zero-window probing Pipecat python 两路:①进程内 ParallelPipeline 并行多条管道;②多 worker 经 WorkerBus 协作——@job(name=, sequential=) 暴露 handler,调用方 async with self.job(name) / self.job_group(names) 发请求并等 JobStatus;WorkerRegistry 跟踪本地/远程 worker(pgmq/redis 可跨进程)
两路:①进程内 ParallelPipeline 并行多条管道;②多 worker 经 WorkerBus 协作——@job(name=, sequential=) 暴露 handler,调用方 async with self.job(name) / self.job_group(names) 发请求并等 JobStatus;WorkerRegistry 跟踪本地/远程 worker(pgmq/redis 可跨进程)
pipeline/parallel_pipeline.py:24from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
class ParallelPipeline(BasePipeline):
"""Pipeline that processes frames through multiple sub-pipelines concurrently.
Creates multiple parallel processing branches from the provided processor lists,
coordinating frame flow and ensuring proper synchronization of lifecycle events
like EndFrames. Each branch runs independently while system frames are handled
specially to maintain pipeline coordination.
"""
def __init__(self, *args): PraisonAI python Agents 容器 + Process 三模式:sequential(按序、自动传上下文)、hierarchical(manager_llm 充当 orchestrator 动态派活)、workflow(按 next_tasks/condition 走图,支持 route/parallel/loop/repeat);另有 Handoff 做 agent→agent 转交(仿 OpenAI Agents SDK)、A2A 协议
Agents 容器 + Process 三模式:sequential(按序、自动传上下文)、hierarchical(manager_llm 充当 orchestrator 动态派活)、workflow(按 next_tasks/condition 走图,支持 route/parallel/loop/repeat);另有 Handoff 做 agent→agent 转交(仿 OpenAI Agents SDK)、A2A 协议
agents/agents.py:1439process/process.py:19 max_iter=self.max_iter
)
if self.process == "workflow":
for task_id in process.workflow():
self.run_task(task_id)
elif self.process == "sequential":
for task_id in process.sequential():
self.run_task(task_id)
elif self.process == "hierarchical":
for task_id in process.hierarchical():
if isinstance(task_id, Task):
task_id = self.add_task(task_id) Semantic Kernel csharp 两代并存:① 旧 AgentGroupChat/AgentChat(轮转+终止策略);② 新 AgentOrchestration<TIn,TOut> 基类下的 Concurrent / Sequential / GroupChat / Handoff / Magentic 五种模式,跑在 actor Runtime 上;GroupChat 由 GroupChatManager(SelectNextAgent/ShouldTerminate/ShouldRequestUserInput) 编排;agent 亦可作为另一 agent 的 plugin
两代并存:① 旧 AgentGroupChat/AgentChat(轮转+终止策略);② 新 AgentOrchestration<TIn,TOut> 基类下的 Concurrent / Sequential / GroupChat / Handoff / Magentic 五种模式,跑在 actor Runtime 上;GroupChat 由 GroupChatManager(SelectNextAgent/ShouldTerminate/ShouldRequestUserInput) 编排;agent 亦可作为另一 agent 的 plugin
dotnet/src/Agents/Orchestration/AgentOrchestration.cs:39dotnet/src/Agents/Orchestration/GroupChat/GroupChatManager.cs:30dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs:17/// </summary>
/// <typeparam name="TInput">The type of the input to the orchestration.</typeparam>
/// <typeparam name="TOutput">The type of the result output by the orchestration.</typeparam>
public abstract partial class AgentOrchestration<TInput, TOutput>
{
/// <summary>
/// Initializes a new instance of the <see cref="AgentOrchestration{TInput, TOutput}"/> class.
/// </summary>
/// <param name="members">Specifies the member agents or orchestrations participating in this orchestration.</param>
protected AgentOrchestration(params Agent[] members)
{
// Capture orchestration root name without generic parameters for use in
// agent type and topic formatting as well as logging. smolagents python 层级式:把子 agent 放进 managed_agents,框架给它套上 name/description/inputs 使其"像工具一样可被调用";父 agent 经 agent(task=...)(__call__) 调用,子 agent 跑完整 run 返回报告。注意:远程沙箱执行器不支持 managed agents
层级式:把子 agent 放进 managed_agents,框架给它套上 name/description/inputs 使其"像工具一样可被调用";父 agent 经 agent(task=...)(__call__) 调用,子 agent 跑完整 run 返回报告。注意:远程沙箱执行器不支持 managed agents
agents.py:369agents.py:868agents.py:1608Strands Agents python 三模式:①Graph 确定性依赖图(支持环/嵌套,GraphBuilder 声明边);②Swarm 自治协作团队(工具化 handoff + 共享上下文);③A2A 协议(server/executor);另 agent.as_tool() 把 agent 当工具
三模式:①Graph 确定性依赖图(支持环/嵌套,GraphBuilder 声明边);②Swarm 自治协作团队(工具化 handoff + 共享上下文);③A2A 协议(server/executor);另 agent.as_tool() 把 agent 当工具
multiagent/graph.py:429multiagent/swarm.py:237agent/agent.py:697 logger.warning("Graph without execution limits may run indefinitely if cycles exist")
class Graph(MultiAgentBase):
"""Directed Graph multi-agent orchestration with configurable revisit behavior."""
def __init__(
self,
nodes: dict[str, GraphNode],
edges: set[GraphEdge],
entry_points: set[GraphNode],
max_node_executions: int | None = None,
execution_timeout: float | None = None, Swarm python 网状 handoff:工具返回 Agent/Result(agent=...) 即移交控制权
网状 handoff:工具返回 Agent/Result(agent=...) 即移交控制权
core.py:76 case Result() as result:
return result
case Agent() as agent:
return Result(
value=json.dumps({"assistant": agent.name}),
agent=agent,
)
case _:
try:
return Result(value=str(result))
except Exception as e:
error_message = f"Failed to cast response to string: {result}. Make sure agent functions return a string or Result object. Error: {str(e)}" SwarmClaw typescript ①进程内 subagent:spawnSubagent 派生隔离子 session,带 delegationDepth 限制(DEFAULT_DELEGATION_MAX_DEPTH);②外部委派:delegate 工具 spawn claude/codex/opencode/gemini/copilot/droid/cursor/qwen CLI 子进程,回退链 + resume id;③跨 OpenClaw gateway 路由;org-chart/team 可视化编排
①进程内 subagent:spawnSubagent 派生隔离子 session,带 delegationDepth 限制(DEFAULT_DELEGATION_MAX_DEPTH);②外部委派:delegate 工具 spawn claude/codex/opencode/gemini/copilot/droid/cursor/qwen CLI 子进程,回退链 + resume id;③跨 OpenClaw gateway 路由;org-chart/team 可视化编排
agents/subagent-runtime.ts:217session-tools/delegate.ts:28agents/delegation-jobs.ts:50openclaw/gateway.ts:110// Core: Spawn a Native Subagent
// ---------------------------------------------------------------------------
export function spawnSubagent(
input: SpawnSubagentInput,
context: SubagentContext,
): Promise<SubagentHandle> {
return spawnSubagentImpl(input, context)
}
async function spawnSubagentImpl(
input: SpawnSubagentInput,
context: SubagentContext, Swarms python 核心卖点:60+ 拓扑文件,经 SwarmRouter 按 17 种 SwarmType 统一分派;含 Sequential/Concurrent(ThreadPool)/AgentRearrange(flow DSL)/Graph(DAG 拓扑排序)/MixtureOfAgents/Hierarchical/GroupChat/MajorityVoting/Council/Debate/Heavy/RoundRobin 等;另有 handoffs 让 agent 间移交
核心卖点:60+ 拓扑文件,经 SwarmRouter 按 17 种 SwarmType 统一分派;含 Sequential/Concurrent(ThreadPool)/AgentRearrange(flow DSL)/Graph(DAG 拓扑排序)/MixtureOfAgents/Hierarchical/GroupChat/MajorityVoting/Council/Debate/Heavy/RoundRobin 等;另有 handoffs 让 agent 间移交
structs/swarm_router.py:118swarm_router.py:48structs/sequential_workflow.py:52structs/concurrent_workflow.py:23 pass
class SwarmRouter:
"""
A class that dynamically routes tasks to different swarm types based on user selection or automatic matching.
The SwarmRouter enables flexible task execution by either using a specified swarm type or automatically determining
the most suitable swarm type for a given task. It handles task execution while managing logging, type validation,
and metadata capture.
Args:
name (str, optional): Name identifier for the SwarmRouter instance. Defaults to "swarm-router". Transformers Agents python 后期支持 managed agents(一个 agent 调用另一个),整体仍偏单 agent
后期支持 managed agents(一个 agent 调用另一个),整体仍偏单 agent
查看 Transformers Agents 完整笔记 →Upsonic python Team 三种模式 sequential/coordinate(leader 协调)/route(router 分派),ask_other_team_members 自动互为工具;Graph 做带 State 的 DAG/链工作流(可 parallel_execution);agent 亦可作为工具被另一 agent 调用
Team 三种模式 sequential/coordinate(leader 协调)/route(router 分派),ask_other_team_members 自动互为工具;Graph 做带 State 的 DAG/链工作流(可 parallel_execution);agent 亦可作为工具被另一 agent 调用
src/upsonic/team/team.py:20graph/graph.py:392from .result_combiner import ResultCombiner
class Team:
"""A callable class for multi-agent operations using the Upsonic client.
Supports both Agent and Team instances as entities, enabling
hierarchical multi-agent workflows with nested teams.
Streaming (stream/astream) is text-only; no events are yielded.
"""
def __init__(self,
entities: Optional[List[Union[Agent, "Team"]]] = None, VoltAgent typescript Supervisor/Sub-agent:SubAgentManager 经 delegate_task 工具把任务 handoffTask 给子 agent,handoffToMultiple 并行委派多个;另有 A2A server 协议跨进程协作
Supervisor/Sub-agent:SubAgentManager 经 delegate_task 工具把任务 handoffTask 给子 agent,handoffToMultiple 并行委派多个;另有 A2A server 协议跨进程协作
agent/subagent/index.ts:55/**
* SubAgentManager - Manages sub-agents and delegation functionality for an Agent
*/
export class SubAgentManager {
/**
* The name of the agent that owns this sub-agent manager
*/
private agentName: string;
/**
* Sub-agent configurations that the parent agent can delegate tasks to
*/
private subAgentConfigs: SubAgentConfig[] = [];