Data Agents-Part 2 :揭秘 AI 量产关键:多智能体编排
定制化AI系统开发全流程:多智能体编排才是量产关键
在定制化AI系统的完整生命周期里,咱们一般会按三步走来:
- 基于现成的预训练大模型(比如GPT-5、Claude这类狠角色)
- 针对特定领域任务做微调
- 结合真实业务场景的反馈,用强化学习打磨效果
但要把模型的“蛮力”转化为能落地的产品,还得补上第四步——多智能体协同编排。
这篇文章就手把手教你,如何用LangGraph搭建这套编排层,把孤零零的大模型,变成一个能规划、会调研、能画图表、还能输出精炼结论的协作团队。
一套极简的多智能体架构
我们采用的是分层架构,各司其职、分工明确:
- 规划师(Planner):把用户的原始需求,拆解成一步步能执行的原子任务
- 调度器(Executor):决定下一步该执行哪个任务、要不要重新规划、以及该派哪个智能体上场
- 专项智能体(Sub-Agents):个个身怀绝技,但术业有专攻
web_researcher(网页研究员):负责从公开网络上扒数据
chart_generator(图表生成器):跑Python代码画图
chart_summarizer(图表解读员):给画好的图表写说明
synthesizer(总结员):输出最终的文本结论
这种分工模式,既能让每个智能体保持简单、方便追溯问题,又能让规划师和调度器专心负责复杂的逻辑推理。
1. 状态与规划师节点:工作流的大脑
我们先定义一个极简的State类,用来存储整个工作流的核心信息:包括任务规划、当前执行步骤、对话记录、是否需要重规划等关键状态。
class State(MessagesState):
user_query: Optional[str] # 用户原始查询
enabled_agents: Optional[List[str]] # 启用的智能体列表
plan: Optional[Dict[str, Dict[str, Any]]] # 任务规划
current_step: int # 当前执行步骤
agent_query: Optional[str] # 发给专项智能体的子查询
last_reason: Optional[str] # 上一步的执行理由
replan_flag: Optional[bool] # 重规划标记
replan_attempts: Optional[Dict[int, int]] # 各步骤的重规划次数
规划师节点的核心工作,就是调用一个擅长逻辑推理的大模型(这里用的是o3),生成一份结构化的分步执行计划:
reasoning_llm = ChatOpenAI(
model="o3",
model_kwargs={"response_format": {"type": "json_object"}},
)
def planner_node(state: State) -> Command["executor"]:
llm_reply = reasoning_llm.invoke([plan_prompt(state)])
parsed_plan = json.loads(llm_reply.content)
return Command(
update={
"plan": parsed_plan,
"messages": [HumanMessage(content=llm_reply.content)],
"user_query": state["user_query"],
"current_step": 1,
"replan_flag": state.get("replan_flag", False),
},
goto="executor",
)
划重点:规划师只负责写计划(输出JSON格式),从不自己动手干活。
2. 调度器节点:整个系统的交通警察
调度器的核心职责就三件事:
- 判断当前任务是否需要重新规划
- 决定下一步该派哪个专项智能体出马
- 生成这个智能体要执行的具体子查询
def executor_node(state: State) -> Command:
plan = state.get("plan", {})
step = state.get("current_step", 1)
llm_reply = reasoning_llm.invoke([executor_prompt(state)])
parsed = json.loads(llm_reply.content)
replan = parsed["replan"] # 是否重规划
goto = parsed["goto"] # 下一步要调用的节点
query = parsed["query"] # 发给专项智能体的子查询
它还会管理重规划的次数,防止无限循环:
if replan:
if step_replans < MAX_REPLANS:
updates["replan_flag"] = True
return Command(update=updates, goto="planner")
如果不需要重规划,就推进到下一步,调用对应的专项智能体:
updates["current_step"] = step + 1
return Command(update=updates, goto=goto)
说白了,这个节点就是整个系统的交通警察,指挥所有智能体有序干活。
3. 网页研究员:互联网数据搬运工
这是一个基于React框架的简单智能体,用Tavily工具实现网页搜索:
tavily_tool = TavilySearch(max_results=5)
web_search_agent = create_react_agent(
ChatOpenAI(model="gpt-4o"),
tools=[tavily_tool],
prompt=agent_system_prompt("你是一名专业的网页研究员,负责精准获取公开网络上的信息…")
)
def web_research_node(state: State) -> Command["executor"]:
result = web_search_agent.invoke({"messages": state["agent_query"]})
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="web_researcher"
)
return Command(update={"messages": result["messages"]}, goto="executor")
搜完数据后,它会把结果反馈给调度器,等待下一步指令。
4. 图表生成器与解读员:数据可视化双子星
这对搭档一个负责画图,一个负责给图表写说明,咱们用Python REPL工具实现:
图表生成器
chart_agent = create_react_agent(
llm,
[python_repl_tool],
prompt=agent_system_prompt("""
你只会干一件事:根据输入的数据,生成对应的可视化图表…
"""),
)
def chart_node(state: State) -> Command["chart_summarizer"]:
result = chart_agent.invoke(state)
result["messages"][-1] = HumanMessage(
content=result["messages"][-1].content, name="chart_generator"
)
return Command(update={"messages": result["messages"]}, goto="chart_summarizer")
图表解读员
chart_summary_agent = create_react_agent(
llm,
tools=[],
prompt=agent_system_prompt("你是一名专业的图表解读员,只会给输入的图表生成清晰易懂的文字说明…"),
)
def chart_summary_node(state: State) -> Command[END]:
result = chart_summary_agent.invoke(state)
return Command(
update={
"messages": result["messages"],
"final_answer": result["messages"][-1].content,
},
goto=END,
)
画完图、写完说明,这个任务流就直接结束了。
5. 总结员:无图表需求时的收尾大师
如果用户的需求不需要生成图表,就轮到总结员出场了。它会汇总所有专项智能体的输出结果,生成一份流畅易读的最终结论:
def synthesizer_node(state: State) -> Command[END]:
relevant_msgs = [
m.content for m in state.get("messages", [])
if getattr(m, "name", None) in ("web_researcher", "chart_generator", "chart_summarizer")
]
llm_reply = llm.invoke([
HumanMessage(content=f"用户问题:{state['user_query']}\n\n参考资料:\n" + "\n---\n".join(relevant_msgs))
])
return Command(
update={"final_answer": llm_reply.content},
goto=END,
)
6. 组装工作流:把所有零件拼起来
最后一步,就是用StateGraph把所有节点串联起来,形成一个完整的工作流:
workflow = StateGraph(State)
# 注册所有节点
workflow.add_node("planner", planner_node)
workflow.add_node("executor", executor_node)
workflow.add_node("web_researcher", web_research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("chart_summarizer", chart_summary_node)
workflow.add_node("synthesizer", synthesizer_node)
# 定义起始节点
workflow.add_edge(START, "planner")
# 编译生成可执行的图
graph = workflow.compile()
现在,这个一站式的智能系统就可以同时处理两种需求了:
需求1:生成图表(比如“画出美国前五名银行的市值对比图”)
state = {
"messages": [HumanMessage(content="Chart the market cap of top 5 US banks")],
"user_query": "Chart the market cap of top 5 US banks",
"enabled_agents": ["web_researcher", "chart_generator", "chart_summarizer", "synthesizer"],
}
graph.invoke(state)
需求2:纯文本回答
state = {
"messages": [HumanMessage(content="Identify current US financial regulations")],
"user_query": "Identify current US financial regulations",
"enabled_agents": ["web_researcher", "synthesizer"],
}
graph.invoke(state)
编排层才是量产的关键
这套多智能体编排层,就像给“预训练→微调→强化学习”这个铁三角,装上了一个灵活的操作系统。它能把实验室里的模型,变成真正能解决业务问题的生产级分析工具。
而且整个架构的模块化程度极高,想加新功能(比如对接SQL数据库查内部数据、用向量检索做文档问答、甚至接入Slack机器人),直接加个新的专项智能体就行,成本低到离谱。
下一篇(第三部分),我们会拓展这套系统,教大家如何对接企业内部数据——比如通过SQL查业务库、用文档检索挖知识库、再结合向量数据库做精准问答。
- 点赞
- 收藏
- 关注作者
评论(0)