多智能体编排

一句话总结
多个各有角色的 agent 如何协作:角色定义、拓扑(主从/网状/流水线/群聊)、通信(handoff/消息/共享状态)。把一个超级 agent 拆成一个团队。概念详见 single-vs-multi-agent。

它解决什么问题

让职责分离(创意 vs 审查)、并行、可组合,代价是协调开销与误差传播。是众多框架(AutoGenCrewAISwarmMetaGPT)的核心卖点。

设计维度 / 实现谱系

  • 拓扑:supervisor 层级 ↔ swarm 网状 ↔ sequential 流水线 ↔ group chat
  • 通信:handoff(移交控制权)↔ 消息传递 ↔ 共享状态/黑板 ↔ 跨进程网络(hcom、A2A)
  • 角色定义:role+goal+backstory(CrewAI)↔ SOP 标准流程(MetaGPT)↔ 函数式 handoff(Swarm
  • 调度:谁决定下一个发言/行动的 agent
  • 终止:何时认为团队任务完成

关键要点

  • 三要素:角色、拓扑、通信。
  • handoff(Swarm)是极简优雅的抽象:工具返回一个 agent 即转交。
  • 先把单 agent 做好再上多 agent(design-tradeoffs)。

关联

各框架实现对比

下表汇总 41 个实现了「多智能体编排」的框架(源码级阅读结论)。网站上以可展开 + 源码节选呈现。

框架实现方式
Aeon进程内非传统多 agent;靠 chains(多 workflow step 串/并行 + 输出落 .outputs/{skill}.md 注入下游)。真正”多实例”= Instance Fleet:spawn-instance fork 出专精副本登记 memory/instances.json,fleet-control/fork-fleet 管理
AG2多模式:①GroupChat+GroupChatManager,speaker 选择 auto/manual/random/round_robin/可调用;②新式 Group/handoff:agent.handoffs 挂 OnContextCondition(无 LLM) / OnCondition(LLM) / after-work,配 Pattern(Auto/RoundRobin/Manual/Random),由 GroupToolExecutor 执行转移;③Swarm(initiate_swarm_chat/run_swarm);④nested / sequential / 两两 chat
Agency Swarm核心卖点。①send_message 工具(同步 RPC:sender 调工具→recipient.get_response()→把 final_output 当工具返回值回灌,tools/send_message.py:314);②有向 communication_flows(a > b 经 AgentFlow+register_subagent 动态装工具);③可选 Handoff 控制权转移工具
Agent-LLM (AGiXT)①agent 互调:extension 命令内经 self.ApiClient.prompt_agent(…) 让一个 agent 调另一个 agent(automation_helpers.py:264);②Chain 可在步骤里指定不同 agent_name 串接多 agent;③各 BotManager(Discord/Slack/Teams…)+ WorkerRegistry/BotManagerRegistry 做渠道侧编排
AgentField核心卖点:app.call(“node.func”) 经控制平面路由(绝不直连),自动传播 workflow/session/actor 上下文并构建 DAG;版本路由用加权轮询做 canary(5%→50%→100%),回 X-Routed-Version;AgentRouter(prefix=…) 做命名空间
Agentic Context Engine (ACE)非”多个对等 agent 协作”,而是三个固定角色的学习流水线经 Pipeline 编排;Branch 支持并行分支+合并,RR 内部可递归派生只读 sandbox 子 agent(create_readonly_sandbox)
AgentScope2.0 移除了 1.0 的 msghub/pipeline 模块(本仓不存在)。当前路径:① Agent.observe() 注入外部消息做松散协作;② FastAPI agent service 做多租户多会话承载与编排(examples/agent_service);③ MCP/A2A 把别的 agent 当工具/服务。README 称 message hub 在 roadmap
AgentVerse框架本体即多 agent 编排。两条主线:① simulation 经 5 规则(order/visibility/selector/updater/describer)调度 N 个对话 agent 的回合制交互;② task-solving 经 4 规则把 agent 按 AGENT_TYPES(ROLE_ASSIGNMENT/SOLVER/CRITIC/EXECUTION/EVALUATION/MANAGER)组织成”招募→讨论→执行→评估”流水线,讨论拓扑可选 vertical/horizontal/central/dynamic
Astron Agent靠 workflow DAG 引擎而非 agent 间直接通信:agent 节点把一个完整 agent 嵌入工作流;flow 节点可嵌套调用其它 workflow(WorkflowPlugin 让 CoT 把别的 workflow 当工具调);节点间靠 VariablePool 传值,引擎用 asyncio.create_task 并发调度依赖就绪的节点
AutoGen核心强项。BaseGroupChat(Team) + BaseGroupChatManager:manager 通过 pub/sub 选下一发言者(select_speaker),participant 经 ChatAgentContainer 包成 runtime agent;五种内置模式:RoundRobin / Selector(LLM 选人) / Swarm(handoff 驱动) / GraphFlow(DiGraph 显式拓扑) / Magentic-One(ledger 编排)
Botpress内核无固定多 agent 协议;用 Exit 做 handoff 实现:MultiAgentOrchestrator 把每个子 agent 暴露为 handoff_ Exit,onExit 钩子里切换 currentAgent 并防环路;可扩展到上百子 agent
ConnectOnion两条路:①进程内 subagents 插件经 task() 工具派生隔离子 agent;②跨网络 host() 把 agent 暴露为 HTTP+WebSocket,经 relay 做 P2P 发现,connect()/RemoteAgent 远程调用
Cordum这是核心:Scheduler 按 Heartbeat(算力/能力/pool/labels)做容量感知路由到 worker 池或直连 worker,靠内存 TTL 注册表(无持久 DB);策略可做 pool segmentation(敏感数据只进可信池)
CrewAIProcess.sequential(按序执行 task) / Process.hierarchical(自动建经理 agent 持 AgentTools 委派);agent 间经 Delegate/AskQuestion 工具协作;A2A 协议跨进程
Dustrun_agent MCP server 实现”agent as tool”:两种模式 run-agent(子 agent 在后台子对话执行并回传结果)与 handoff(子 agent 直接接管对话);可向子 agent 转发文件/toolset
hcom核心。①消息:hcom send @name(s) [—intent request/inform/ack] [—reply-to] [—thread],按 @mention 定向或广播,写成 events 行(commands/send.rs:1, messages.rs:13 scope 计算)。②投递:Claude 经 PostToolUse hook turn 中途注入、SessionStart/Stop 注入(hooks/claude.rs:916 handle_posttooluse, :421 handle_sessionstart);PTY 模式经 TCP inject 端口投递(delivery.rs:1)。③spawn/fork/resume/kill:hcom [N] 起真实终端或 headless(commands/launch.rs:1, launcher.rs:1),hcom f/r/kill(commands/fork.rs, resume.rs, kill.rs)。④订阅/反应:hcom events sub ,订阅存 kv 的 events_sub: 行,命中可自动 on_hit_text 回消息(db/subscriptions.rs:1)。⑤碰撞检测默认开:两 agent 30s 内改同一文件双方收通知(README:101)
Hermes Agentdelegate_task 工具派生隔离子 AIAgent:全新对话(无父史)、独立 task_id/终端、受限 toolset(强制剥离 delegate/clarify/memory/send_message/execute_code)、单/批并行(ThreadPoolExecutor),父进程阻塞至子完成、只回看 summary
Hive核心范式:Queen 生成 worker graph,多 worker 并行(fan-out/fan-in),colony=一组 worker 的部署;session 隔离 + 共享 buffer;run_parallel_workers 横向扩同类工作
Lagent组合式:Sequential 容器按顺序串接 agent 并可 exit_at 提前退出;AgentList/AgentDict 把 agent 当容器元素;任意 agent 赋值为属性即成递归子 agent(_agents)。无中心调度器/角色协议
LangChaincreate_agent(name=…) 返回的图可作为子图节点嵌入另一张 LangGraph(factory.py:803 文档);SubagentTransformer 把嵌套命名 agent 识别为 run.subagents 句柄并转发其事件流;更高层编排走 LangGraph / Deep Agents
LlamaIndexAgentWorkflow 持有多个命名 agent + root_agent;自动注入 handoff 工具(return_direct)按 can_handoff_to 白名单切换 current_agent_name,共享同一 memory/state;from_tools_or_functions 据是否 function-calling 自动选 Function/ReAct agent
LoongFlow进程内多 Worker 流水线:PESAgent 以 asyncio 并发跑多个 evolution cycle(concurrency 控并发数),每 cycle 内 Planner/Executor/Summary 三 Agent 协作;岛模型把种群分到 num_islands 并定期迁移,等价于并行进化群体;无跨网络分布式 agent 协议
Maestro核心范式:1 个 orchestrator (强模型) + N 次 sub-agent (弱模型) 的 supervisor 编排;模型分层由 3 常量配置;orchestrator 串行派发,子代理无并发、不互相通信
Mastra三条路:①agent.network(messages, opts) 用一个 routing agent 在 sub-agents/workflows/tools 间动态路由迭代直至完成;②sub-agent:在 agents: 注册的 SubAgent 被包成工具供主 agent 调用(getToolsForExecution);③workflow 内 createStep(agent) 把 agent 当步骤静态编排;另有 A2A 协议(@mastra/core/a2a)
MetaGPT框架核心:Team.hire(roles) 把角色放进 Environment;Environment.publish_message 按 member_addrs 地址路由到角色私有队列;Environment.run 用 asyncio.gather 并发跑所有非 idle 角色;订阅靠 cause_by ∈ rc.watch 解耦。新一代 MGXEnv + TeamLeader(Mike)做动态 @ 路由与 direct_chat
Modusactor 间消息传递:SendMessage(同步阻塞带 timeout)/SendMessageAsync(timeout=0 异步);agent 可在自身方法内 Start/SendMessage 其他 agent;底层 GoAkt 支持分布式(actor 可能在别的进程/机器)
nanobot进程内 SubagentManager 派生隔离子 agent(独立 ToolRegistry/workspace scope,复用 AgentRunner),结果经 bus 作为 system 消息回灌父会话;spawn 工具触发,受 max_concurrent_subagents 限流。无跨网络 agent 协议
Open Multi-Agent核心范式:coordinator 拆 DAG → TaskQueue 拓扑解析(完成自动 unblock、失败级联) → Scheduler 自动分配(默认 dependency-first,另有 round-robin/least-busy/capability-match) → AgentPool 信号量并发执行;可选 delegate_to_agent 工具做同步子 agent 委派(带环检测/深度上限/池槽防死锁)
OpenClaw两条路:①多 agent 路由——按渠道/账号/peer 把入站消息路由到隔离 agent(独立 workspace + per-agent session);②子 agent 派生——sessions_spawn 工具派生 subagent / ACP 外部 CLI agent,受 maxSpawnDepth、maxChildren、requireAgentId 策略约束;sessions_list/sessions_history/sessions_send 做跨会话协作
Pilot Protocol核心,但是”网络级编排”:①寻址=48 位虚拟地址 N:NNNN.HHHH.LLLL + 16 位端口 + hostname 发现;②互信=双向签名握手(端口 444,经 registry relay),节点默认私有;③发现/NAT=经 rendezvous 注册解析、STUN、打洞、relay 兜底。data flows 点对点直连
Pipecat两路:①进程内 ParallelPipeline 并行多条管道;②多 worker 经 WorkerBus 协作——@job(name=, sequential=) 暴露 handler,调用方 async with self.job(name) / self.job_group(names) 发请求并等 JobStatus;WorkerRegistry 跟踪本地/远程 worker(pgmq/redis 可跨进程)
PraisonAIAgents 容器 + Process 三模式:sequential(按序、自动传上下文)、hierarchical(manager_llm 充当 orchestrator 动态派活)、workflow(按 next_tasks/condition 走图,支持 route/parallel/loop/repeat);另有 Handoff 做 agent→agent 转交(仿 OpenAI Agents SDK)、A2A 协议
Semantic Kernel两代并存:① 旧 AgentGroupChat/AgentChat(轮转+终止策略);② 新 AgentOrchestration<TIn,TOut> 基类下的 Concurrent / Sequential / GroupChat / Handoff / Magentic 五种模式,跑在 actor Runtime 上;GroupChat 由 GroupChatManager(SelectNextAgent/ShouldTerminate/ShouldRequestUserInput) 编排;agent 亦可作为另一 agent 的 plugin
smolagents层级式:把子 agent 放进 managed_agents,框架给它套上 name/description/inputs 使其”像工具一样可被调用”;父 agent 经 agent(task=…)(call) 调用,子 agent 跑完整 run 返回报告。注意:远程沙箱执行器不支持 managed agents
Strands Agents三模式:①Graph 确定性依赖图(支持环/嵌套,GraphBuilder 声明边);②Swarm 自治协作团队(工具化 handoff + 共享上下文);③A2A 协议(server/executor);另 agent.as_tool() 把 agent 当工具
Swarm网状 handoff:工具返回 Agent/Result(agent=…) 即移交控制权
SwarmClaw①进程内 subagent:spawnSubagent 派生隔离子 session,带 delegationDepth 限制(DEFAULT_DELEGATION_MAX_DEPTH);②外部委派:delegate 工具 spawn claude/codex/opencode/gemini/copilot/droid/cursor/qwen CLI 子进程,回退链 + resume id;③跨 OpenClaw gateway 路由;org-chart/team 可视化编排
Swarms核心卖点:60+ 拓扑文件,经 SwarmRouter 按 17 种 SwarmType 统一分派;含 Sequential/Concurrent(ThreadPool)/AgentRearrange(flow DSL)/Graph(DAG 拓扑排序)/MixtureOfAgents/Hierarchical/GroupChat/MajorityVoting/Council/Debate/Heavy/RoundRobin 等;另有 handoffs 让 agent 间移交
Transformers Agents后期支持 managed agents(一个 agent 调用另一个),整体仍偏单 agent
UpsonicTeam 三种模式 sequential/coordinate(leader 协调)/route(router 分派),ask_other_team_members 自动互为工具;Graph 做带 State 的 DAG/链工作流(可 parallel_execution);agent 亦可作为工具被另一 agent 调用
VoltAgentSupervisor/Sub-agent:SubAgentManager 经 delegate_task 工具把任务 handoffTask 给子 agent,handoffToMultiple 并行委派多个;另有 A2A server 协议跨进程协作

各框架实现对比 · 源码级

41 个框架实现该组件 · 38 个附源码节选 · 点击任意框架展开看实现要点与代码

Aeon yaml 进程内非传统多 agent;靠 chains(多 workflow step 串/并行 + 输出落 .outputs/{skill}.md 注入下游)。真正"多实例"= Instance Fleet:spawn-instance fork 出专精副本登记 memory/instances.json,fleet-control/fork-fleet 管理

进程内非传统多 agent;靠 chains(多 workflow step 串/并行 + 输出落 .outputs/{skill}.md 注入下游)。真正"多实例"= Instance Fleet:spawn-instance fork 出专精副本登记 memory/instances.json,fleet-control/fork-fleet 管理

查看 Aeon 完整笔记 →
AG2 python 多模式:①GroupChat+GroupChatManager,speaker 选择 auto/manual/random/round_robin/可调用;②新式 Group/handoff:agent.handoffs 挂 OnContextCondition(无 LLM) / OnCondition(LLM) / after-work,配 Pattern(Auto/RoundRobin/Manual/Random),由 GroupToolExecutor 执行转移;③Swarm(initiate_swarm_chat/run_swarm);④nested / sequential / 两两 chat

多模式:①GroupChat+GroupChatManager,speaker 选择 auto/manual/random/round_robin/可调用;②新式 Group/handoff:agent.handoffs 挂 OnContextCondition(无 LLM) / OnCondition(LLM) / after-work,配 Pattern(Auto/RoundRobin/Manual/Random),由 GroupToolExecutor 执行转移;③Swarm(initiate_swarm_chat/run_swarm);④nested / sequential / 两两 chat

agentchat/groupchat.py:147agentchat/group/handoffs.py:16group/patterns/auto.py:19group/multi_agent_chat.py:44
autogen/agentchat/groupchat.py:147 python
    max_round: int = 10
    admin_name: str = "Admin"
    func_call_filter: bool = True
    speaker_selection_method: Literal["auto", "manual", "random", "round_robin"] | Callable[..., Any] = "auto"
    max_retries_for_selecting_speaker: int = 2
    allow_repeat_speaker: bool | list[Agent] | None = None
    allowed_or_disallowed_speaker_transitions: dict[str, Any] | None = None
    speaker_transitions_type: Literal["allowed", "disallowed", None] = None
    enable_clear_history: bool = False
    send_introductions: bool = False
    select_speaker_message_template: str = """You are in a role play game. The following roles are available:
                {roles}.
                Read the following conversation.
查看 AG2 完整笔记 →
Agency Swarm python 核心卖点。①send_message 工具(同步 RPC:sender 调工具→recipient.get_response()→把 final_output 当工具返回值回灌,tools/send_message.py:314);②有向 communication_flows(a > b 经 AgentFlow+register_subagent 动态装工具);③可选 Handoff 控制权转移工具

核心卖点。①send_message 工具(同步 RPC:sender 调工具→recipient.get_response()→把 final_output 当工具返回值回灌,tools/send_message.py:314);②有向 communication_flows(a > b 经 AgentFlow+register_subagent 动态装工具);③可选 Handoff 控制权转移工具

tools/send_message.py:38agency/setup.py:178agent/subagents.py:34agent/agent_flow.py:14
src/agency_swarm/tools/send_message.py:38 python
logger = logging.getLogger(__name__)


class SendMessage(FunctionTool):
    """
    Use this tool to facilitate direct, synchronous communication between specialized agents within your agency.

    When you send a message using this tool, you receive a response exclusively from the designated recipient
    agent. To continue the dialogue, invoke this tool again with the desired recipient agent and your follow-up
    message. Remember, communication here is synchronous; the recipient agent won't perform any tasks post-response.

    You are responsible for relaying the recipient agent's responses back to the user, as the user does not have
    direct access to these replies. Keep engaging with the tool for continuous interaction until the task is fully
查看 Agency Swarm 完整笔记 →
Agent-LLM (AGiXT) python ①agent 互调:extension 命令内经 self.ApiClient.prompt_agent(...) 让一个 agent 调另一个 agent(automation_helpers.py:264);②Chain 可在步骤里指定不同 agent_name 串接多 agent;③各 BotManager(Discord/Slack/Teams…)+ WorkerRegistry/BotManagerRegistry 做渠道侧编排

①agent 互调:extension 命令内经 self.ApiClient.prompt_agent(...) 让一个 agent 调另一个 agent(automation_helpers.py:264);②Chain 可在步骤里指定不同 agent_name 串接多 agent;③各 BotManager(Discord/Slack/Teams…)+ WorkerRegistry/BotManagerRegistry 做渠道侧编排

automation_helpers.py:264XT.py:1004
agixt/extensions/automation_helpers.py:264 python
        fields = output_model.model_fields
        field_descriptions = [f"{field}: {fields[field]}" for field in fields]
        schema = "\n".join(field_descriptions)
        response = self.ApiClient.prompt_agent(
            agent_id=self.agent_id,
            prompt_name="Convert to JSON",
            prompt_args={
                "user_input": input_string,
                "schema": schema,
                "conversation_name": "AGiXT Terminal",
            },
        )
        response = str(response).split("```json")[1].split("```")[0].strip()
查看 Agent-LLM (AGiXT) 完整笔记 →
AgentField go 核心卖点:app.call("node.func") 经控制平面路由(绝不直连),自动传播 workflow/session/actor 上下文并构建 DAG;版本路由用加权轮询做 canary(5%→50%→100%),回 X-Routed-Version;AgentRouter(prefix=...) 做命名空间

核心卖点:app.call("node.func") 经控制平面路由(绝不直连),自动传播 workflow/session/actor 上下文并构建 DAG;版本路由用加权轮询做 canary(5%→50%→100%),回 X-Routed-Version;AgentRouter(prefix=...) 做命名空间

agent.py:3710control-plane/internal/handlers/execute.go:1567execute.go:310
sdk/python/agentfield/agent.py:3710 python
            **kwargs,
        )

    async def call(self, target: str, *args, **kwargs) -> dict:
        """
        Initiates a cross-agent call to another reasoner or skill via the AgentField execution gateway.

        This method allows agents to seamlessly communicate and utilize reasoners/skills
        deployed on other agent nodes within the AgentField ecosystem. It properly propagates
        workflow tracking headers and maintains execution context for DAG building.

        **Return Type**: Always returns JSON/dict objects, similar to calling any REST API.
        No automatic schema conversion is performed - developers can convert to Pydantic
查看 AgentField 完整笔记 →
Agentic Context Engine (ACE) python 非"多个对等 agent 协作",而是三个固定角色的学习流水线经 Pipeline 编排;Branch 支持并行分支+合并,RR 内部可递归派生只读 sandbox 子 agent(create_readonly_sandbox)

非"多个对等 agent 协作",而是三个固定角色的学习流水线经 Pipeline 编排;Branch 支持并行分支+合并,RR 内部可递归派生只读 sandbox 子 agent(create_readonly_sandbox)

ace/runners/ace.py:80ace/core/sandbox.py:749
ace/runners/ace.py:80 python
        """
        skillbook = skillbook or Skillbook()
        steps: list[StepProtocol[ACEStepContext]] = [
            AgentStep(agent, skillbook),
            EvaluateStep(environment),
            *learning_tail(
                reflector,
                skill_manager,
                skillbook,
                dedup_manager=dedup_manager,
                dedup_interval=dedup_interval,
                checkpoint_dir=checkpoint_dir,
                checkpoint_interval=checkpoint_interval,
查看 Agentic Context Engine (ACE) 完整笔记 →
AgentScope python 2.0 移除了 1.0 的 msghub/pipeline 模块(本仓不存在)。当前路径:① Agent.observe() 注入外部消息做松散协作;② FastAPI agent service 做多租户多会话承载与编排(examples/agent_service);③ MCP/A2A 把别的 agent 当工具/服务。README 称 message hub 在 roadmap

2.0 移除了 1.0 的 msghub/pipeline 模块(本仓不存在)。当前路径:① Agent.observe() 注入外部消息做松散协作;② FastAPI agent service 做多租户多会话承载与编排(examples/agent_service);③ MCP/A2A 把别的 agent 当工具/服务。README 称 message hub 在 roadmap

agent/_agent.py:254mcp/_mcp_client.py:22
src/agentscope/agent/_agent.py:254 python
        finally:
            pass

    async def observe(self, msgs: Msg | list[Msg] | None = None) -> None:
        """Receive external observation message(s) and save them into
        context."""
        await self._handle_incoming_messages(msgs)

    async def compress_context(
        self,
        context_config: ContextConfig | None = None,
    ) -> None:
        """Compress the agent's context if the token count exceeds the
查看 AgentScope 完整笔记 →
AgentVerse python 框架本体即多 agent 编排。两条主线:① simulation 经 5 规则(order/visibility/selector/updater/describer)调度 N 个对话 agent 的回合制交互;② task-solving 经 4 规则把 agent 按 AGENT_TYPES(ROLE_ASSIGNMENT/SOLVER/CRITIC/EXECUTION/EVALUATION/MANAGER)组织成"招募→讨论→执行→评估"流水线,讨论拓扑可选 vertical/horizontal/central/dynamic

框架本体即多 agent 编排。两条主线:① simulation 经 5 规则(order/visibility/selector/updater/describer)调度 N 个对话 agent 的回合制交互;② task-solving 经 4 规则把 agent 按 AGENT_TYPES(ROLE_ASSIGNMENT/SOLVER/CRITIC/EXECUTION/EVALUATION/MANAGER)组织成"招募→讨论→执行→评估"流水线,讨论拓扑可选 vertical/horizontal/central/dynamic

environments/simulation_env/rules/base.py:32environments/tasksolving_env/rules/base.py:71
agentverse/environments/simulation_env/rules/base.py:32 python


# class Rule(BaseModel):
class SimulationRule(BaseRule):
    """
    Rule for the environment. It controls the speaking order of the agents
    and maintain the set of visible agents for each agent.
    """

    order: BaseOrder
    visibility: BaseVisibility
    selector: BaseSelector
    updater: BaseUpdater
查看 AgentVerse 完整笔记 →
Astron Agent python 靠 workflow DAG 引擎而非 agent 间直接通信:agent 节点把一个完整 agent 嵌入工作流;flow 节点可嵌套调用其它 workflow(WorkflowPlugin 让 CoT 把别的 workflow 当工具调);节点间靠 VariablePool 传值,引擎用 asyncio.create_task 并发调度依赖就绪的节点

靠 workflow DAG 引擎而非 agent 间直接通信:agent 节点把一个完整 agent 嵌入工作流;flow 节点可嵌套调用其它 workflow(WorkflowPlugin 让 CoT 把别的 workflow 当工具调);节点间靠 VariablePool 传值,引擎用 asyncio.create_task 并发调度依赖就绪的节点

dsl_engine.py:786
core/workflow/engine/dsl_engine.py:786 python
        return DefaultNodeExecutionStrategy()


class WorkflowEngine(BaseModel):
    """
    Main workflow execution engine.

    Orchestrates the execution of workflow nodes using depth-first search,
    manages error handling, retry mechanisms, and provides various execution
    strategies for different node types.
    """

    engine_ctx: WorkflowEngineCtx = None  # type: ignore
查看 Astron Agent 完整笔记 →
AutoGen python 核心强项。BaseGroupChat(Team) + BaseGroupChatManager:manager 通过 pub/sub 选下一发言者(select_speaker),participant 经 ChatAgentContainer 包成 runtime agent;五种内置模式:RoundRobin / Selector(LLM 选人) / Swarm(handoff 驱动) / GraphFlow(DiGraph 显式拓扑) / Magentic-One(ledger 编排)

核心强项。BaseGroupChat(Team) + BaseGroupChatManager:manager 通过 pub/sub 选下一发言者(select_speaker),participant 经 ChatAgentContainer 包成 runtime agent;五种内置模式:RoundRobin / Selector(LLM 选人) / Swarm(handoff 驱动) / GraphFlow(DiGraph 显式拓扑) / Magentic-One(ledger 编排)

_group_chat/_base_group_chat_manager.py:25_round_robin_group_chat.py:72_selector_group_chat.py:152_swarm_group_chat.py:82
python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat_manager.py:25 python
from ._sequential_routed_agent import SequentialRoutedAgent


class BaseGroupChatManager(SequentialRoutedAgent, ABC):
    """Base class for a group chat manager that manages a group chat with multiple participants.

    It is the responsibility of the caller to ensure:
    - All participants must subscribe to the group chat topic and each of their own topics.
    - The group chat manager must subscribe to the group chat topic.
    - The agent types of the participants must be unique.
    - For each participant, the agent type must be the same as the topic type.

    Without the above conditions, the group chat will not function correctly.
查看 AutoGen 完整笔记 →
Botpress typescript 内核无固定多 agent 协议;用 Exit 做 handoff 实现:MultiAgentOrchestrator 把每个子 agent 暴露为 handoff_<name> Exit,onExit 钩子里切换 currentAgent 并防环路;可扩展到上百子 agent

内核无固定多 agent 协议;用 Exit 做 handoff 实现:MultiAgentOrchestrator 把每个子 agent 暴露为 handoff_<name> Exit,onExit 钩子里切换 currentAgent 并防环路;可扩展到上百子 agent

packages/llmz/examples/08_chat_multi_agent/orchestrator.ts:19orchestrator.ts:72orchestrator.ts:110
packages/llmz/examples/08_chat_multi_agent/orchestrator.ts:19 typescript
 * This system can scale to hundreds of sub-agents, each with dozens or even hundreds of tools.
 * It is designed to handle complex inquiries by allowing agents to hand off requests to other agents.
 */
export class MultiAgentOrchestrator {
  private _agents: SubAgent[]
  private _currentAgentName: string

  private _previousHandoffs: Array<{
    from: string
    to: string
    reason: string
  }> = []
查看 Botpress 完整笔记 →
ConnectOnion python 两条路:①进程内 subagents 插件经 task() 工具派生隔离子 agent;②跨网络 host() 把 agent 暴露为 HTTP+WebSocket,经 relay 做 P2P 发现,connect()/RemoteAgent 远程调用

两条路:①进程内 subagents 插件经 task() 工具派生隔离子 agent;②跨网络 host() 把 agent 暴露为 HTTP+WebSocket,经 relay 做 P2P 发现,connect()/RemoteAgent 远程调用

network/host/server.py:280network/connect.py:565
connectonion/network/host/server.py:280 python
DEFAULT_RELAY_URL = "wss://oo.openonion.ai"


def host(
    create_agent: Callable,
    port: int = None,
    trust: Union[str, "Agent"] = None,
    result_ttl: int = None,
    workers: int = None,
    reload: bool = None,
    *,
    relay_url: str = DEFAULT_RELAY_URL,
    blacklist: list | None = None,
查看 ConnectOnion 完整笔记 →
Cordum go 这是核心:Scheduler 按 Heartbeat(算力/能力/pool/labels)做容量感知路由到 worker 池或直连 worker,靠内存 TTL 注册表(无持久 DB);策略可做 pool segmentation(敏感数据只进可信池)

这是核心:Scheduler 按 Heartbeat(算力/能力/pool/labels)做容量感知路由到 worker 池或直连 worker,靠内存 TTL 注册表(无持久 DB);策略可做 pool segmentation(敏感数据只进可信池)

DESIGN.md:124
DESIGN.md:124 markdown
This keeps bus payloads small, preserves durability, and allows audit tooling
to dereference pointers when needed (`GET /api/v1/memory?ptr=...`).

## 5) Control Plane Flow (High Level)

1. Gateway writes context JSON to Redis, sets `context_ptr`, publishes
   `BusPacket{JobRequest}` to `sys.job.submit`.
2. Scheduler receives `JobRequest`, records state, calls Safety Kernel, and
   applies `PolicyConstraints`.
3. Scheduler selects a pool and/or direct worker subject using heartbeats
   and publishes to `job.*` or `worker.<id>.jobs`.
4. Worker fetches `context_ptr`, executes, writes result to Redis, publishes
   `BusPacket{JobResult}` with `result_ptr` and `artifact_ptrs`.
查看 Cordum 完整笔记 →
CrewAI python Process.sequential(按序执行 task) / Process.hierarchical(自动建经理 agent 持 AgentTools 委派);agent 间经 Delegate/AskQuestion 工具协作;A2A 协议跨进程

Process.sequential(按序执行 task) / Process.hierarchical(自动建经理 agent 持 AgentTools 委派);agent 间经 Delegate/AskQuestion 工具协作;A2A 协议跨进程

crewai/process.py:4crewai/crew.py:1464crewai/tools/agent_tools/delegate_work_tool.py:16
lib/crewai/src/crewai/process.py:4 python
from enum import Enum


class Process(str, Enum):
    """
    Class representing the different processes that can be used to tackle tasks
    """

    sequential = "sequential"
    hierarchical = "hierarchical"
    # TODO: consensual = 'consensual'
查看 CrewAI 完整笔记 →
Dust typescript run_agent MCP server 实现"agent as tool":两种模式 run-agent(子 agent 在后台子对话执行并回传结果)与 handoff(子 agent 直接接管对话);可向子 agent 转发文件/toolset

run_agent MCP server 实现"agent as tool":两种模式 run-agent(子 agent 在后台子对话执行并回传结果)与 handoff(子 agent 直接接管对话);可向子 agent 转发文件/toolset

front/lib/api/actions/servers/run_agent/metadata.ts:18
front/lib/api/actions/servers/run_agent/metadata.ts:18 typescript
// The actual tool name is dynamic: `run_<agent_name>`.
export const RUN_AGENT_PLACEHOLDER_TOOL_NAME = "run_agent" as const;

export const RUN_AGENT_CONFIGURABLE_PROPERTIES = {
  executionMode: z
    .object({
      options: z
        .union([
          z
            .object({
              value: z.literal("run-agent"),
              label: z.literal("Agent runs in the background"),
            })
查看 Dust 完整笔记 →
hcom rust 核心。①消息:hcom send @name(s) [--intent request|inform|ack] [--reply-to] [--thread],按 @mention 定向或广播,写成 events 行(commands/send.rs:1, messages.rs:13 scope 计算)。②投递:Claude 经 PostToolUse hook turn 中途注入、SessionStart/Stop 注入(hooks/claude.rs:916 handle_posttooluse, :421 handle_sessionstart);PTY 模式经 TCP inject 端口投递(delivery.rs:1)。③spawn/fork/resume/kill:hcom [N] <tool> 起真实终端或 headless(commands/launch.rs:1, launcher.rs:1),hcom f/r/kill(commands/fork.rs, resume.rs, kill.rs)。④订阅/反应:hcom events sub <filters>,订阅存 kv 的 events_sub: 行,命中可自动 on_hit_text 回消息(db/subscriptions.rs:1)。⑤碰撞检测默认开:两 agent 30s 内改同一文件双方收通知(README:101)

核心。①消息:hcom send @name(s) [--intent request|inform|ack] [--reply-to] [--thread],按 @mention 定向或广播,写成 events 行(commands/send.rs:1, messages.rs:13 scope 计算)。②投递:Claude 经 PostToolUse hook turn 中途注入、SessionStart/Stop 注入(hooks/claude.rs:916 handle_posttooluse, :421 handle_sessionstart);PTY 模式经 TCP inject 端口投递(delivery.rs:1)。③spawn/fork/resume/kill:hcom [N] <tool> 起真实终端或 headless(commands/launch.rs:1, launcher.rs:1),hcom f/r/kill(commands/fork.rs, resume.rs, kill.rs)。④订阅/反应:hcom events sub <filters>,订阅存 kv 的 events_sub: 行,命中可自动 on_hit_text 回消息(db/subscriptions.rs:1)。⑤碰撞检测默认开:两 agent 30s 内改同一文件双方收通知(README:101)

hooks/claude.rs:916
src/hooks/claude.rs:916 rust
}

/// Parent PostToolUse: bootstrap, messages, vanilla binding.
fn handle_posttooluse(
    db: &HcomDb,
    ctx: &HcomContext,
    payload: &HookPayload,
    instance_name: &str,
    instance_data: &InstanceRow,
    updates: &serde_json::Map<String, Value>,
) -> (i32, String) {
    let tool_name = payload.tool_name.as_str();
    let mut outputs: Vec<Value> = Vec::new();
查看 hcom 完整笔记 →
Hermes Agent python delegate_task 工具派生隔离子 AIAgent:全新对话(无父史)、独立 task_id/终端、受限 toolset(强制剥离 delegate/clarify/memory/send_message/execute_code)、单/批并行(ThreadPoolExecutor),父进程阻塞至子完成、只回看 summary

delegate_task 工具派生隔离子 AIAgent:全新对话(无父史)、独立 task_id/终端、受限 toolset(强制剥离 delegate/clarify/memory/send_message/execute_code)、单/批并行(ThreadPoolExecutor),父进程阻塞至子完成、只回看 summary

tools/delegate_tool.py:1
tools/delegate_tool.py:1 python
#!/usr/bin/env python3
"""
Delegate Tool -- Subagent Architecture

Spawns child AIAgent instances with isolated context, restricted toolsets,
and their own terminal sessions. Supports single-task and batch (parallel)
modes. The parent blocks until all children complete.

Each child gets:
  - A fresh conversation (no parent history)
查看 Hermes Agent 完整笔记 →
Hive python 核心范式:Queen 生成 worker graph,多 worker 并行(fan-out/fan-in),colony=一组 worker 的部署;session 隔离 + 共享 buffer;run_parallel_workers 横向扩同类工作

核心范式:Queen 生成 worker graph,多 worker 并行(fan-out/fan-in),colony=一组 worker 的部署;session 隔离 + 共享 buffer;run_parallel_workers 横向扩同类工作

orchestrator/orchestrator.py:115orchestrator/edge.py:416
core/framework/orchestrator/orchestrator.py:115 python
    branch_timeout_seconds: float = 300.0


class Orchestrator:
    """
    Executes agent graphs.

    Example:
        executor = GraphExecutor(
            runtime=runtime,
            llm=llm,
            tools=tools,
            tool_executor=my_tool_executor,
查看 Hive 完整笔记 →
Lagent python 组合式:Sequential 容器按顺序串接 agent 并可 exit_at 提前退出;AgentList/AgentDict 把 agent 当容器元素;任意 agent 赋值为属性即成递归子 agent(_agents)。无中心调度器/角色协议

组合式:Sequential 容器按顺序串接 agent 并可 exit_at 提前退出;AgentList/AgentDict 把 agent 当容器元素;任意 agent 赋值为属性即成递归子 agent(_agents)。无中心调度器/角色协议

agents/agent.py:409agents/agent.py:550agents/agent.py:114
lagent/agents/agent.py:409 python
    pass


class Sequential(Agent):
    """Sequential is an agent container that forwards messages to each agent
    in the order they are added."""

    def __init__(self, *agents: Union[Agent, Iterable], **kwargs):
        super().__init__(**kwargs)
        self._agents = OrderedDict()
        if not agents:
            raise ValueError('At least one agent should be provided')
        if isinstance(agents[0], Iterable) and not isinstance(agents[0], Agent):
查看 Lagent 完整笔记 →
LangChain python create_agent(name=...) 返回的图可作为子图节点嵌入另一张 LangGraph(factory.py:803 文档);SubagentTransformer 把嵌套命名 agent 识别为 run.subagents 句柄并转发其事件流;更高层编排走 LangGraph / Deep Agents

create_agent(name=...) 返回的图可作为子图节点嵌入另一张 LangGraph(factory.py:803 文档);SubagentTransformer 把嵌套命名 agent 识别为 run.subagents 句柄并转发其事件流;更高层编排走 LangGraph / Deep Agents

agents/_subagent_transformer.py:1factory.py:1681factory.py:803
libs/langchain_v1/langchain/agents/_subagent_transformer.py:1 python
"""Surface nested named agents as typed `run.subagents` handles.

Detects subagents via the `lc_agent_name` transition that langgraph's base
`_TasksLifecycleBase` now computes. `create_agent(name=...)` binds
`lc_agent_name` into the run config; the base transformer records, per
namespace, the `lc_agent_name` seen on each task start (first-write-wins).

A subagent boundary is a nested run whose `lc_agent_name` is set *and* differs
from its parent namespace's `lc_agent_name`. Plain subgraphs inherit the
parent's name (so they compare equal and are excluded); unnamed agents have
查看 LangChain 完整笔记 →
LlamaIndex python AgentWorkflow 持有多个命名 agent + root_agent;自动注入 handoff 工具(return_direct)按 can_handoff_to 白名单切换 current_agent_name,共享同一 memory/state;from_tools_or_functions 据是否 function-calling 自动选 Function/ReAct agent

AgentWorkflow 持有多个命名 agent + root_agent;自动注入 handoff 工具(return_direct)按 can_handoff_to 白名单切换 current_agent_name,共享同一 memory/state;from_tools_or_functions 据是否 function-calling 自动选 Function/ReAct agent

agent/workflow/multi_agent_workflow.py:99
llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py:99 python
    """Metaclass for AgentWorkflow that inherits from WorkflowMeta."""


class AgentWorkflow(Workflow, PromptMixin, metaclass=AgentWorkflowMeta):
    """A workflow for managing multiple agents with handoffs."""

    def __init__(
        self,
        agents: List[BaseWorkflowAgent],
        initial_state: Optional[Dict] = None,
        root_agent: Optional[str] = None,
        handoff_prompt: Optional[Union[str, BasePromptTemplate]] = None,
        handoff_output_prompt: Optional[Union[str, BasePromptTemplate]] = None,
查看 LlamaIndex 完整笔记 →
LoongFlow python 进程内多 Worker 流水线:PESAgent 以 asyncio 并发跑多个 evolution cycle(concurrency 控并发数),每 cycle 内 Planner/Executor/Summary 三 Agent 协作;岛模型把种群分到 num_islands 并定期迁移,等价于并行进化群体;无跨网络分布式 agent 协议

进程内多 Worker 流水线:PESAgent 以 asyncio 并发跑多个 evolution cycle(concurrency 控并发数),每 cycle 内 Planner/Executor/Summary 三 Agent 协作;岛模型把种群分到 num_islands 并定期迁移,等价于并行进化群体;无跨网络分布式 agent 协议

framework/pes/pes_agent.py:377agentsdk/memory/evolution/in_memory.py:1057
src/loongflow/framework/pes/pes_agent.py:377 python
                exc_info=True,
            )

    async def _try_start_new_cycle(self) -> bool:
        """
        Atomically checks conditions and starts a new evolution cycle if possible.
        """
        async with self._iteration_lock:
            if (
                len(self._running_tasks) < self.max_workers
                and self._current_iteration < self.max_iterations
                and not self._stop_event.is_set()
            ):
查看 LoongFlow 完整笔记 →
Maestro python 核心范式:1 个 orchestrator (强模型) + N 次 sub-agent (弱模型) 的 supervisor 编排;模型分层由 3 常量配置;orchestrator 串行派发,子代理无并发、不互相通信

核心范式:1 个 orchestrator (强模型) + N 次 sub-agent (弱模型) 的 supervisor 编排;模型分层由 3 常量配置;orchestrator 串行派发,子代理无并发、不互相通信

maestro.py:19maestro.py:42
maestro.py:19 python
# Claude 3 Haiku    claude-3-haiku-20240307
# Claude 3.5 Sonnet claude-3-5-sonnet-20240620

ORCHESTRATOR_MODEL = "claude-3-5-sonnet-20240620"
SUB_AGENT_MODEL = "claude-3-5-sonnet-20240620"
REFINER_MODEL = "claude-3-5-sonnet-20240620"

def calculate_subagent_cost(model, input_tokens, output_tokens):
    # Pricing information per model
    pricing = {
        "claude-3-opus-20240229": {"input_cost_per_mtok": 15.00, "output_cost_per_mtok": 75.00},
        "claude-3-haiku-20240307": {"input_cost_per_mtok": 0.25, "output_cost_per_mtok": 1.25},
        "claude-3-sonnet-20240229": {"input_cost_per_mtok": 3.00, "output_cost_per_mtok": 15.00},
查看 Maestro 完整笔记 →
Mastra typescript 三条路:①agent.network(messages, opts) 用一个 routing agent 在 sub-agents/workflows/tools 间动态路由迭代直至完成;②sub-agent:在 agents: 注册的 SubAgent 被包成工具供主 agent 调用(getToolsForExecution);③workflow 内 createStep(agent) 把 agent 当步骤静态编排;另有 A2A 协议(@mastra/core/a2a)

三条路:①agent.network(messages, opts) 用一个 routing agent 在 sub-agents/workflows/tools 间动态路由迭代直至完成;②sub-agent:在 agents: 注册的 SubAgent 被包成工具供主 agent 调用(getToolsForExecution);③workflow 内 createStep(agent) 把 agent 当步骤静态编排;另有 A2A 协议(@mastra/core/a2a)

agent/agent.ts:6377agent/subagent.ts:43loop/network/index.ts:297workflows/workflow.ts:430
packages/core/src/agent/agent.ts:6377 typescript
   * }
   * ```
   */
  async network(
    messages: MessageListInput,
    options?: MultiPrimitiveExecutionOptions<undefined>,
  ): Promise<MastraAgentNetworkStream<undefined>>;
  async network<OUTPUT extends {}>(
    messages: MessageListInput,
    options?: MultiPrimitiveExecutionOptions<OUTPUT>,
  ): Promise<MastraAgentNetworkStream<OUTPUT>>;
  async network<OUTPUT = undefined>(messages: MessageListInput, options?: MultiPrimitiveExecutionOptions<OUTPUT>) {
    const requestContextToUse = options?.requestContext || new RequestContext();
查看 Mastra 完整笔记 →
MetaGPT python 框架核心:Team.hire(roles) 把角色放进 Environment;Environment.publish_message 按 member_addrs 地址路由到角色私有队列;Environment.run 用 asyncio.gather 并发跑所有非 idle 角色;订阅靠 cause_by ∈ rc.watch 解耦。新一代 MGXEnv + TeamLeader(Mike)做动态 @ 路由与 direct_chat

框架核心:Team.hire(roles) 把角色放进 Environment;Environment.publish_message 按 member_addrs 地址路由到角色私有队列;Environment.run 用 asyncio.gather 并发跑所有非 idle 角色;订阅靠 cause_by ∈ rc.watch 解耦。新一代 MGXEnv + TeamLeader(Mike)做动态 @ 路由与 direct_chat

metagpt/team.py:32metagpt/environment/base_env.py:175base_env.py:197role.py:399
metagpt/team.py:32 python
)


class Team(BaseModel):
    """
    Team: Possesses one or more roles (agents), SOP (Standard Operating Procedures), and a env for instant messaging,
    dedicated to env any multi-agent activity, such as collaboratively writing executable code.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    env: Optional[Environment] = None
    investment: float = Field(default=10.0)
查看 MetaGPT 完整笔记 →
Modus go actor 间消息传递:SendMessage(同步阻塞带 timeout)/SendMessageAsync(timeout=0 异步);agent 可在自身方法内 Start/SendMessage 其他 agent;底层 GoAkt 支持分布式(actor 可能在别的进程/机器)

actor 间消息传递:SendMessage(同步阻塞带 timeout)/SendMessageAsync(timeout=0 异步);agent 可在自身方法内 Start/SendMessage 其他 agent;底层 GoAkt 支持分布式(actor 可能在别的进程/机器)

sdk/go/pkg/agents/agents.go:337runtime/actors/agents.go:201
sdk/go/pkg/agents/agents.go:337 go
// The message is sent synchronously and the function will block until a response is received or the timeout is reached.
// The timeout can be set using the WithTimeout option. If there is no timeout set, it will wait indefinitely until a
// response is received, or until calling function is cancelled.
func SendMessage(agentId, msgName string, options ...messageOption) (*string, error) {
	m := &message{
		name:    msgName,
		timeout: math.MaxInt64, // default to no timeout (wait indefinitely)
	}

	for _, opt := range options {
		opt(m)
	}
查看 Modus 完整笔记 →
nanobot python 进程内 SubagentManager 派生隔离子 agent(独立 ToolRegistry/workspace scope,复用 AgentRunner),结果经 bus 作为 system 消息回灌父会话;spawn 工具触发,受 max_concurrent_subagents 限流。无跨网络 agent 协议

进程内 SubagentManager 派生隔离子 agent(独立 ToolRegistry/workspace scope,复用 AgentRunner),结果经 bus 作为 system 消息回灌父会话;spawn 工具触发,受 max_concurrent_subagents 限流。无跨网络 agent 协议

agent/subagent.py:74agent/loop.py:275
nanobot/agent/subagent.py:74 python
            self._status.error = str(context.error)


class SubagentManager:
    """Manages background subagent execution."""

    def __init__(
        self,
        provider: LLMProvider,
        workspace: Path,
        bus: MessageBus,
        max_tool_result_chars: int,
        model: str | None = None,
查看 nanobot 完整笔记 →
Open Multi-Agent typescript 核心范式:coordinator 拆 DAG → TaskQueue 拓扑解析(完成自动 unblock、失败级联) → Scheduler 自动分配(默认 dependency-first,另有 round-robin/least-busy/capability-match) → AgentPool 信号量并发执行;可选 delegate_to_agent 工具做同步子 agent 委派(带环检测/深度上限/池槽防死锁)

核心范式:coordinator 拆 DAG → TaskQueue 拓扑解析(完成自动 unblock、失败级联) → Scheduler 自动分配(默认 dependency-first,另有 round-robin/least-busy/capability-match) → AgentPool 信号量并发执行;可选 delegate_to_agent 工具做同步子 agent 委派(带环检测/深度上限/池槽防死锁)

src/orchestrator/orchestrator.ts:1070src/task/queue.ts:55src/orchestrator/scheduler.ts:96src/tool/built-in/delegate.ts:24
src/orchestrator/orchestrator.ts:1070 typescript
   * @param team - A team created via {@link createTeam} (or `new Team(...)`).
   * @param goal - High-level natural-language goal for the team.
   */
  async runTeam(
    team: Team,
    goal: string,
    options?: RunTeamOptions,
  ): Promise<TeamRunResult> {
    const agentConfigs = team.getAgents()
    const coordinatorOverrides = options?.coordinator

    // ------------------------------------------------------------------
    // Short-circuit: skip coordinator for simple, single-action goals.
查看 Open Multi-Agent 完整笔记 →
OpenClaw typescript 两条路:①多 agent 路由——按渠道/账号/peer 把入站消息路由到隔离 agent(独立 workspace + per-agent session);②子 agent 派生——sessions_spawn 工具派生 subagent / ACP 外部 CLI agent,受 maxSpawnDepth、maxChildren、requireAgentId 策略约束;sessions_list/sessions_history/sessions_send 做跨会话协作

两条路:①多 agent 路由——按渠道/账号/peer 把入站消息路由到隔离 agent(独立 workspace + per-agent session);②子 agent 派生——sessions_spawn 工具派生 subagent / ACP 外部 CLI agent,受 maxSpawnDepth、maxChildren、requireAgentId 策略约束;sessions_list/sessions_history/sessions_send 做跨会话协作

src/agents/acp-spawn.ts:1src/agents/agent-tools.policy.ts:70
src/agents/acp-spawn.ts:1 typescript
/** Implements ACP subagent/session spawning, binding, limits, and parent-stream setup. */
import crypto from "node:crypto";
import fs from "node:fs/promises";
import {
  resolveAcpSessionCwd,
  resolveAcpThreadSessionDetailLines,
} from "@openclaw/acp-core/runtime/session-identifiers";
import type { AcpRuntimeSessionMode } from "@openclaw/acp-core/runtime/types";
import {
  normalizeOptionalLowercaseString,
查看 OpenClaw 完整笔记 →
Pilot Protocol go 核心,但是"网络级编排":①寻址=48 位虚拟地址 N:NNNN.HHHH.LLLL + 16 位端口 + hostname 发现;②互信=双向签名握手(端口 444,经 registry relay),节点默认私有;③发现/NAT=经 rendezvous 注册解析、STUN、打洞、relay 兜底。data flows 点对点直连

核心,但是"网络级编排":①寻址=48 位虚拟地址 N:NNNN.HHHH.LLLL + 16 位端口 + hostname 发现;②互信=双向签名握手(端口 444,经 registry relay),节点默认私有;③发现/NAT=经 rendezvous 注册解析、STUN、打洞、relay 兜底。data flows 点对点直连

README.md:163cmd/daemon/main.go:233pkg/daemon/routing/routing.go:21
README.md:163 markdown
<tr>
<td width="50%" valign="top">

**Addressing**
- 48-bit virtual addresses (`N:NNNN.HHHH.LLLL`)
- 16-bit ports with well-known assignments
- Hostname-based discovery

**Transport**
- Reliable streams (TCP-equivalent)
- Sliding window, SACK, congestion control (AIMD)
- Flow control (advertised receive window)
- Nagle coalescing, auto segmentation, zero-window probing
查看 Pilot Protocol 完整笔记 →
Pipecat python 两路:①进程内 ParallelPipeline 并行多条管道;②多 worker 经 WorkerBus 协作——@job(name=, sequential=) 暴露 handler,调用方 async with self.job(name) / self.job_group(names) 发请求并等 JobStatus;WorkerRegistry 跟踪本地/远程 worker(pgmq/redis 可跨进程)

两路:①进程内 ParallelPipeline 并行多条管道;②多 worker 经 WorkerBus 协作——@job(name=, sequential=) 暴露 handler,调用方 async with self.job(name) / self.job_group(names) 发请求并等 JobStatus;WorkerRegistry 跟踪本地/远程 worker(pgmq/redis 可跨进程)

pipeline/parallel_pipeline.py:24
src/pipecat/pipeline/parallel_pipeline.py:24 python
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup


class ParallelPipeline(BasePipeline):
    """Pipeline that processes frames through multiple sub-pipelines concurrently.

    Creates multiple parallel processing branches from the provided processor lists,
    coordinating frame flow and ensuring proper synchronization of lifecycle events
    like EndFrames. Each branch runs independently while system frames are handled
    specially to maintain pipeline coordination.
    """

    def __init__(self, *args):
查看 Pipecat 完整笔记 →
PraisonAI python Agents 容器 + Process 三模式:sequential(按序、自动传上下文)、hierarchical(manager_llm 充当 orchestrator 动态派活)、workflow(按 next_tasks/condition 走图,支持 route/parallel/loop/repeat);另有 Handoff 做 agent→agent 转交(仿 OpenAI Agents SDK)、A2A 协议

Agents 容器 + Process 三模式:sequential(按序、自动传上下文)、hierarchical(manager_llm 充当 orchestrator 动态派活)、workflow(按 next_tasks/condition 走图,支持 route/parallel/loop/repeat);另有 Handoff 做 agent→agent 转交(仿 OpenAI Agents SDK)、A2A 协议

agents/agents.py:1439process/process.py:19
src/praisonai-agents/praisonaiagents/agents/agents.py:1439 python
            max_iter=self.max_iter
        )
        
        if self.process == "workflow":
            for task_id in process.workflow():
                self.run_task(task_id)
        elif self.process == "sequential":
            for task_id in process.sequential():
                self.run_task(task_id)
        elif self.process == "hierarchical":
            for task_id in process.hierarchical():
                if isinstance(task_id, Task):
                    task_id = self.add_task(task_id)
查看 PraisonAI 完整笔记 →
Semantic Kernel csharp 两代并存:① 旧 AgentGroupChat/AgentChat(轮转+终止策略);② 新 AgentOrchestration<TIn,TOut> 基类下的 Concurrent / Sequential / GroupChat / Handoff / Magentic 五种模式,跑在 actor Runtime 上;GroupChat 由 GroupChatManager(SelectNextAgent/ShouldTerminate/ShouldRequestUserInput) 编排;agent 亦可作为另一 agent 的 plugin

两代并存:① 旧 AgentGroupChat/AgentChat(轮转+终止策略);② 新 AgentOrchestration<TIn,TOut> 基类下的 Concurrent / Sequential / GroupChat / Handoff / Magentic 五种模式,跑在 actor Runtime 上;GroupChat 由 GroupChatManager(SelectNextAgent/ShouldTerminate/ShouldRequestUserInput) 编排;agent 亦可作为另一 agent 的 plugin

dotnet/src/Agents/Orchestration/AgentOrchestration.cs:39dotnet/src/Agents/Orchestration/GroupChat/GroupChatManager.cs:30dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs:17
dotnet/src/Agents/Orchestration/AgentOrchestration.cs:39 csharp
/// </summary>
/// <typeparam name="TInput">The type of the input to the orchestration.</typeparam>
/// <typeparam name="TOutput">The type of the result output by the orchestration.</typeparam>
public abstract partial class AgentOrchestration<TInput, TOutput>
{
    /// <summary>
    /// Initializes a new instance of the <see cref="AgentOrchestration{TInput, TOutput}"/> class.
    /// </summary>
    /// <param name="members">Specifies the member agents or orchestrations participating in this orchestration.</param>
    protected AgentOrchestration(params Agent[] members)
    {
        // Capture orchestration root name without generic parameters for use in
        // agent type and topic formatting as well as logging.
查看 Semantic Kernel 完整笔记 →
smolagents python 层级式:把子 agent 放进 managed_agents,框架给它套上 name/description/inputs 使其"像工具一样可被调用";父 agent 经 agent(task=...)(__call__) 调用,子 agent 跑完整 run 返回报告。注意:远程沙箱执行器不支持 managed agents

层级式:把子 agent 放进 managed_agents,框架给它套上 name/description/inputs 使其"像工具一样可被调用";父 agent 经 agent(task=...)(__call__) 调用,子 agent 跑完整 run 返回报告。注意:远程沙箱执行器不支持 managed agents

agents.py:369agents.py:868agents.py:1608
查看 smolagents 完整笔记 →
Strands Agents python 三模式:①Graph 确定性依赖图(支持环/嵌套,GraphBuilder 声明边);②Swarm 自治协作团队(工具化 handoff + 共享上下文);③A2A 协议(server/executor);另 agent.as_tool() 把 agent 当工具

三模式:①Graph 确定性依赖图(支持环/嵌套,GraphBuilder 声明边);②Swarm 自治协作团队(工具化 handoff + 共享上下文);③A2A 协议(server/executor);另 agent.as_tool() 把 agent 当工具

multiagent/graph.py:429multiagent/swarm.py:237agent/agent.py:697
strands-py/src/strands/multiagent/graph.py:429 python
            logger.warning("Graph without execution limits may run indefinitely if cycles exist")


class Graph(MultiAgentBase):
    """Directed Graph multi-agent orchestration with configurable revisit behavior."""

    def __init__(
        self,
        nodes: dict[str, GraphNode],
        edges: set[GraphEdge],
        entry_points: set[GraphNode],
        max_node_executions: int | None = None,
        execution_timeout: float | None = None,
查看 Strands Agents 完整笔记 →
Swarm python 网状 handoff:工具返回 Agent/Result(agent=...) 即移交控制权

网状 handoff:工具返回 Agent/Result(agent=...) 即移交控制权

core.py:76
swarm/core.py:76 python
            case Result() as result:
                return result

            case Agent() as agent:
                return Result(
                    value=json.dumps({"assistant": agent.name}),
                    agent=agent,
                )
            case _:
                try:
                    return Result(value=str(result))
                except Exception as e:
                    error_message = f"Failed to cast response to string: {result}. Make sure agent functions return a string or Result object. Error: {str(e)}"
查看 Swarm 完整笔记 →
SwarmClaw typescript ①进程内 subagent:spawnSubagent 派生隔离子 session,带 delegationDepth 限制(DEFAULT_DELEGATION_MAX_DEPTH);②外部委派:delegate 工具 spawn claude/codex/opencode/gemini/copilot/droid/cursor/qwen CLI 子进程,回退链 + resume id;③跨 OpenClaw gateway 路由;org-chart/team 可视化编排

①进程内 subagent:spawnSubagent 派生隔离子 session,带 delegationDepth 限制(DEFAULT_DELEGATION_MAX_DEPTH);②外部委派:delegate 工具 spawn claude/codex/opencode/gemini/copilot/droid/cursor/qwen CLI 子进程,回退链 + resume id;③跨 OpenClaw gateway 路由;org-chart/team 可视化编排

agents/subagent-runtime.ts:217session-tools/delegate.ts:28agents/delegation-jobs.ts:50openclaw/gateway.ts:110
src/lib/server/agents/subagent-runtime.ts:217 typescript
// Core: Spawn a Native Subagent
// ---------------------------------------------------------------------------

export function spawnSubagent(
  input: SpawnSubagentInput,
  context: SubagentContext,
): Promise<SubagentHandle> {
  return spawnSubagentImpl(input, context)
}

async function spawnSubagentImpl(
  input: SpawnSubagentInput,
  context: SubagentContext,
查看 SwarmClaw 完整笔记 →
Swarms python 核心卖点:60+ 拓扑文件,经 SwarmRouter 按 17 种 SwarmType 统一分派;含 Sequential/Concurrent(ThreadPool)/AgentRearrange(flow DSL)/Graph(DAG 拓扑排序)/MixtureOfAgents/Hierarchical/GroupChat/MajorityVoting/Council/Debate/Heavy/RoundRobin 等;另有 handoffs 让 agent 间移交

核心卖点:60+ 拓扑文件,经 SwarmRouter 按 17 种 SwarmType 统一分派;含 Sequential/Concurrent(ThreadPool)/AgentRearrange(flow DSL)/Graph(DAG 拓扑排序)/MixtureOfAgents/Hierarchical/GroupChat/MajorityVoting/Council/Debate/Heavy/RoundRobin 等;另有 handoffs 让 agent 间移交

structs/swarm_router.py:118swarm_router.py:48structs/sequential_workflow.py:52structs/concurrent_workflow.py:23
swarms/structs/swarm_router.py:118 python
    pass


class SwarmRouter:
    """
    A class that dynamically routes tasks to different swarm types based on user selection or automatic matching.

    The SwarmRouter enables flexible task execution by either using a specified swarm type or automatically determining
    the most suitable swarm type for a given task. It handles task execution while managing logging, type validation,
    and metadata capture.

    Args:
        name (str, optional): Name identifier for the SwarmRouter instance. Defaults to "swarm-router".
查看 Swarms 完整笔记 →
Transformers Agents python 后期支持 managed agents(一个 agent 调用另一个),整体仍偏单 agent

后期支持 managed agents(一个 agent 调用另一个),整体仍偏单 agent

查看 Transformers Agents 完整笔记 →
Upsonic python Team 三种模式 sequential/coordinate(leader 协调)/route(router 分派),ask_other_team_members 自动互为工具;Graph 做带 State 的 DAG/链工作流(可 parallel_execution);agent 亦可作为工具被另一 agent 调用

Team 三种模式 sequential/coordinate(leader 协调)/route(router 分派),ask_other_team_members 自动互为工具;Graph 做带 State 的 DAG/链工作流(可 parallel_execution);agent 亦可作为工具被另一 agent 调用

src/upsonic/team/team.py:20graph/graph.py:392
src/upsonic/team/team.py:20 python
from .result_combiner import ResultCombiner


class Team:
    """A callable class for multi-agent operations using the Upsonic client.
    
    Supports both Agent and Team instances as entities, enabling
    hierarchical multi-agent workflows with nested teams.
    Streaming (stream/astream) is text-only; no events are yielded.
    """
    
    def __init__(self,
                 entities: Optional[List[Union[Agent, "Team"]]] = None,
查看 Upsonic 完整笔记 →
VoltAgent typescript Supervisor/Sub-agent:SubAgentManager 经 delegate_task 工具把任务 handoffTask 给子 agent,handoffToMultiple 并行委派多个;另有 A2A server 协议跨进程协作

Supervisor/Sub-agent:SubAgentManager 经 delegate_task 工具把任务 handoffTask 给子 agent,handoffToMultiple 并行委派多个;另有 A2A server 协议跨进程协作

agent/subagent/index.ts:55
packages/core/src/agent/subagent/index.ts:55 typescript
/**
 * SubAgentManager - Manages sub-agents and delegation functionality for an Agent
 */
export class SubAgentManager {
  /**
   * The name of the agent that owns this sub-agent manager
   */
  private agentName: string;

  /**
   * Sub-agent configurations that the parent agent can delegate tasks to
   */
  private subAgentConfigs: SubAgentConfig[] = [];
查看 VoltAgent 完整笔记 →