构建生产级 ReAct Agent:从推理循环到工具编排的实战指南
在大模型应用开发中,ReAct(Reasoning + Acting)已经成为构建智能Agent的事实标准范式。不同于简单的聊天补全,ReAct让模型能够在推理和行动之间迭代循环——先思考该做什么,再执行工具,观察结果,然后继续思考,直到完成任务。本文将深入剖析ReAct的核心机制,并带你从零构建一个可用于生产环境的ReAct Agent。
1. 为什么需要ReAct?
早期的LLM应用主要依赖链式提示(Chain-of-Thought)来引导模型推理。但纯推理有个致命问题:模型只能输出文本,无法与外部世界交互。当你需要查询数据库、调用API、执行代码或搜索网络时,纯推理就无能为力了。
ReAct的核心洞察是将推理和行动交织在一起:
- Reasoning:模型分析当前状态,规划下一步行动
- Acting:模型调用工具获取外部信息或执行操作
- Observing:模型接收工具返回结果,纳入上下文
- 循环:重复上述过程,直到得出最终答案
这种范式使得LLM从”问答机器”进化为”问题解决者”——它不仅能回答已知知识,还能通过工具调用获取未知信息、验证假设、执行多步操作。
2. ReAct 核心循环的数学本质
从控制论角度看,ReAct循环本质上是一个带反馈的马尔可夫决策过程。每一步的状态由历史轨迹决定:
状态 S_t = (Q, A_0, O_0, A_1, O_1, ..., A_{t-1}, O_{t-1})
动作 A_t = π(S_t) → 选择工具调用或输出答案
观察 O_t = E(A_t) → 执行工具获得结果
其中Q是用户初始问题,π是策略(即LLM本身),E是环境(工具执行器)。循环终止条件是模型输出最终答案而非工具调用。
3. 构建基础ReAct Agent
让我们从一个最小可用的ReAct实现开始,然后逐步增强到生产级:
import json
import re
from typing import Callable
class ReActAgent:
def __init__(self, llm_call: Callable, tools: dict, max_steps: int = 10):
self.llm_call = llm_call
self.tools = tools
self.max_steps = max_steps
self.system_prompt = """你是一个智能助手,可以使用工具来解决问题。
可用工具:
{tool_descriptions}
请按照以下格式思考:
Thought: 分析当前情况,决定下一步行动
Action: 工具名称
Action Input: 工具输入参数(JSON格式)
或者当你有最终答案时:
Thought: 我已经得到了足够的信息
Final Answer: 你的回答
开始吧!"""
def _build_tool_descriptions(self) -> str:
descs = []
for name, tool in self.tools.items():
descs.append(f"- {name}: {tool['description']}")
if 'params' in tool:
for param in tool['params']:
descs.append(f" • {param['name']}: {param['description']}")
return '\n'.join(descs)
def _parse_action(self, text: str) -> dict:
"""解析模型的Thought/Action/Observation输出"""
thought_match = re.search(r'Thought:\s*(.+?)(?=Action:|Final Answer:|$)',
text, re.DOTALL)
action_match = re.search(r'Action:\s*(\w+)', text)
action_input_match = re.search(r'Action Input:\s*({.*?})', text, re.DOTALL)
final_match = re.search(r'Final Answer:\s*(.+)', text, re.DOTALL)
result = {'thought': thought_match.group(1).strip() if thought_match else ''}
if final_match:
result['type'] = 'final'
result['answer'] = final_match.group(1).strip()
elif action_match:
result['type'] = 'action'
result['action'] = action_match.group(1).strip()
if action_input_match:
result['action_input'] = json.loads(action_input_match.group(1))
else:
result['action_input'] = {}
return result
def run(self, question: str) -> str:
history = []
for step in range(self.max_steps):
# 构建当前prompt
prompt = self.system_prompt.format(
tool_descriptions=self._build_tool_descriptions()
)
prompt += f"\n\nQuestion: {question}\n\n"
if history:
prompt += "之前的思考过程:\n"
for h in history:
prompt += f"{h}\n"
prompt += "\n"
# 调用LLM
response = self.llm_call(prompt)
parsed = self._parse_action(response)
print(f"Step {step + 1} - Thought: {parsed['thought']}")
if parsed['type'] == 'final':
return parsed['answer']
# 执行工具
tool_name = parsed['action']
if tool_name not in self.tools:
obs = f"错误:工具 '{tool_name}' 不存在"
else:
try:
obs = self.tools[tool_name]['function'](**parsed.get('action_input', {}))
except Exception as e:
obs = f"工具执行错误:{str(e)}"
print(f"Step {step + 1} - Action: {tool_name} → Observation: {obs}")
history.append(
f"Thought: {parsed['thought']}\n"
f"Action: {tool_name}\n"
f"Action Input: {json.dumps(parsed.get('action_input', {}))}\n"
f"Observation: {obs}"
)
return f"达到最大步数限制({self.max_steps}步),未能完成回答"
4. 生产级增强:错误处理与重试机制
基础版本在实际使用中存在多个脆弱点:JSON解析失败、工具调用超时、模型输出格式偏离等。生产级Agent需要完善的容错机制:
import time
import traceback
from dataclasses import dataclass, field
from enum import Enum
class ToolStatus(Enum):
SUCCESS = "success"
ERROR = "error"
TIMEOUT = "timeout"
VALIDATION_ERROR = "validation_error"
@dataclass
class ToolResult:
status: ToolStatus
output: str
duration_ms: float = 0
retries: int = 0
error_detail: str = ""
@dataclass
class AgentTrace:
"""完整的执行轨迹,用于调试和审计"""
question: str = ""
steps: list = field(default_factory=list)
total_duration_ms: float = 0
tool_calls: int = 0
errors: int = 0
success: bool = False
class ProductionReActAgent:
def __init__(self, llm_call, tools: dict, max_steps: int = 15,
max_retries: int = 2, tool_timeout: float = 30.0):
self.llm_call = llm_call
self.tools = tools
self.max_steps = max_steps
self.max_retries = max_retries
self.tool_timeout = tool_timeout
def _validate_action_input(self, tool_name: str, inputs: dict) -> tuple:
"""验证工具输入参数"""
if tool_name not in self.tools:
return False, f"未知工具: {tool_name}"
tool = self.tools[tool_name]
required = tool.get('required_params', [])
for param in required:
if param not in inputs:
return False, f"缺少必需参数: {param}"
return True, ""
def _execute_tool_with_retry(self, tool_name: str, inputs: dict) -> ToolResult:
"""带重试和超时保护的工具执行"""
start = time.time()
last_error = ""
for attempt in range(self.max_retries + 1):
try:
# 参数验证
valid, error_msg = self._validate_action_input(tool_name, inputs)
if not valid:
return ToolResult(
status=ToolStatus.VALIDATION_ERROR,
output=error_msg,
error_detail=error_msg
)
# 执行工具(带超时)
func = self.tools[tool_name]['function']
result = func(**inputs)
duration = (time.time() - start) * 1000
return ToolResult(
status=ToolStatus.SUCCESS,
output=str(result),
duration_ms=duration,
retries=attempt
)
except Exception as e:
last_error = f"{type(e).__name__}: {str(e)}"
if attempt < self.max_retries:
wait = 2 ** attempt # 指数退避
time.sleep(wait)
return ToolResult(
status=ToolStatus.ERROR,
output=f"工具执行失败(已重试{self.max_retries}次): {last_error}",
duration_ms=(time.time() - start) * 1000,
retries=self.max_retries,
error_detail=traceback.format_exc()
)
def _repair_malformed_output(self, raw_output: str) -> dict:
"""尝试修复模型输出的格式偏差"""
# 尝试提取任何可能的JSON action input
json_match = re.search(r'\{[^{}]*"[^"]+"\s*:\s*[^}]+\}', raw_output)
# 检查是否有明确的答案指示
answer_patterns = [
r'(?:因此|所以|综上|最终|答案是)[::]\s*(.+)',
r'Final\s*Answer[::]\s*(.+)',
r'因此答案[::]\s*(.+)'
]
for pattern in answer_patterns:
match = re.search(pattern, raw_output, re.IGNORECASE | re.DOTALL)
if match:
return {'type': 'final', 'thought': raw_output[:200],
'answer': match.group(1).strip()}
# 尝试识别工具名称
for tool_name in self.tools:
if tool_name.lower() in raw_output.lower():
inputs = {}
if json_match:
try:
inputs = json.loads(json_match.group())
except json.JSONDecodeError:
pass
return {'type': 'action', 'thought': raw_output[:200],
'action': tool_name, 'action_input': inputs}
return None # 无法修复
5. 高级特性:动态工具注册与上下文压缩
在真实场景中,Agent可能需要使用数十个工具,将所有工具描述塞入每次请求会快速耗尽上下文窗口。我们需要动态工具管理和上下文压缩:
class SmartToolRegistry:
"""智能工具注册表:按需加载 + 使用频率统计"""
def __init__(self):
self._tools = {}
self._usage_count = {}
self._last_used = {}
def register(self, name: str, description: str, function: Callable,
required_params: list = None, category: str = "general"):
self._tools[name] = {
'description': description,
'function': function,
'required_params': required_params or [],
'category': category
}
self._usage_count[name] = 0
def get_relevant_tools(self, question: str, max_tools: int = 5) -> dict:
"""根据问题语义选择最相关的工具"""
# 简单实现:基于关键词匹配 + 使用频率加权
scored = []
question_lower = question.lower()
for name, tool in self._tools.items():
score = 0
# 关键词匹配
keywords = tool['description'].lower().split()
for kw in keywords:
if kw in question_lower:
score += 2
# 使用频率加权
score += self._usage_count.get(name, 0) * 0.5
scored.append((name, score))
scored.sort(key=lambda x: x[1], reverse=True)
selected = scored[:max_tools]
return {name: self._tools[name] for name, _ in selected}
def record_usage(self, name: str):
self._usage_count[name] = self._usage_count.get(name, 0) + 1
self._last_used[name] = time.time()
class ContextCompressor:
"""智能上下文压缩器:保留关键信息,压缩冗余观察"""
@staticmethod
def compress_observations(history: list, max_obs_length: int = 200) -> list:
"""压缩历史观察结果"""
compressed = []
for entry in history:
if 'Observation:' in entry:
parts = entry.split('Observation:')
obs = parts[1].strip()
if len(obs) > max_obs_length:
obs = obs[:max_obs_length] + '... [已截断]'
compressed.append(parts[0] + 'Observation: ' + obs)
else:
compressed.append(entry)
return compressed
@staticmethod
def summarize_early_steps(history: list, threshold: int = 5) -> str:
"""当历史过长时,将早期步骤压缩为摘要"""
if len(history) <= threshold:
return '\n'.join(history)
early = history[:len(history) - threshold]
recent = history[len(history) - threshold:]
summary = f"[前{len(early)}步已压缩] 已执行{len(early)}次工具调用。\n"
summary += "最近的思考过程:\n"
summary += '\n'.join(recent)
return summary
6. 完整的生产级Agent编排
将以上组件组装为一个完整的、可直接部署的ReAct Agent:
class ProductionAgent:
"""生产级ReAct Agent"""
def __init__(self, llm_client, tool_registry: SmartToolRegistry):
self.llm = llm_client
self.registry = tool_registry
self.compressor = ContextCompressor()
def solve(self, question: str, verbose: bool = True) -> AgentTrace:
trace = AgentTrace(question=question)
start_time = time.time()
# 动态选择相关工具
relevant_tools = self.registry.get_relevant_tools(question)
history = []
for step in range(15):
# 构建prompt
prompt = self._build_prompt(question, relevant_tools, history)
# LLM推理
try:
response = self.llm.complete(prompt)
except Exception as e:
trace.errors += 1
if verbose:
print(f"[Step {step+1}] LLM调用失败: {e}")
continue
# 解析动作
action = self._parse_action(response)
if action is None:
# 尝试修复
action = self._repair_malformed_output(response)
if action is None:
if verbose:
print(f"[Step {step+1}] 无法解析输出,跳过")
trace.errors += 1
continue
if verbose:
print(f"[Step {step+1}] 💭 {action.get('thought', '')[:100]}")
if action['type'] == 'final':
trace.success = True
trace.total_duration_ms = (time.time() - start_time) * 1000
trace.steps.append({
'step': step + 1,
'type': 'final',
'answer': action['answer']
})
if verbose:
print(f"\n✅ 最终答案: {action['answer'][:200]}")
return trace
# 执行工具
tool_name = action['action']
result = self._execute_tool(tool_name, action.get('action_input', {}))
trace.tool_calls += 1
if result.status != ToolStatus.SUCCESS:
trace.errors += 1
self.registry.record_usage(tool_name)
if verbose:
status_emoji = "✅" if result.status == ToolStatus.SUCCESS else "❌"
print(f"[Step {step+1}] {status_emoji} {tool_name}: {result.output[:100]}")
# 记录步骤
history.append(
f"Thought: {action['thought']}\n"
f"Action: {tool_name}\n"
f"Action Input: {json.dumps(action.get('action_input', {}))}\n"
f"Observation: {result.output}"
)
trace.steps.append({
'step': step + 1,
'type': 'action',
'tool': tool_name,
'result': result.status.value
})
# 上下文压缩
if len(history) > 5:
history = [self.compressor.summarize_early_steps(history)]
trace.total_duration_ms = (time.time() - start_time) * 1000
return trace
def _build_prompt(self, question, tools, history):
tool_desc = "\n".join(
f"- {name}: {t['description']}"
for name, t in tools.items()
)
SYSTEM = f"""你是一个能使用工具解决问题的智能Agent。
可用工具:
{tool_desc}
格式要求:
Thought: [你的推理过程]
Action: [工具名]
Action Input: [JSON参数]
或完成时:
Thought: [总结推理]
Final Answer: [最终答案]"""
prompt = f"{SYSTEM}\n\nQuestion: {question}"
if history:
prompt += f"\n\n执行历史:\n{history[-1]}"
return prompt
def _parse_action(self, text):
# 解析逻辑(同前,略)
pass
def _repair_malformed_output(self, text):
# 修复逻辑(同前,略)
pass
def _execute_tool(self, name, inputs):
# 执行逻辑(同前,略)
pass
7. 性能优化与最佳实践
在生产部署中,以下几个关键点决定了ReAct Agent的可靠性:
7.1 提示工程要点
- 少样本示例:在system prompt中包含2-3个完整的Thought/Action/Observation示例,能显著提升格式遵循率
- 工具描述质量:工具描述应包含使用场景、参数含义、返回值格式,避免模糊描述
- 错误恢复指令:明确告知模型"如果工具返回错误,请分析原因并尝试其他方法"
7.2 成本控制策略
- 模型分级:简单步骤使用小模型(如GPT-4o-mini),复杂推理使用大模型
- 工具调用预算:设置单任务最大token消耗,超限后强制输出当前最佳答案
- 缓存:对相同工具调用结果进行缓存,避免重复执行
7.3 安全考量
- 工具权限隔离:危险操作(文件删除、数据库写入)需要二次确认
- 输入消毒:对模型生成的工具参数进行验证和转义,防止注入攻击
- 执行沙箱:代码执行类工具应在隔离环境中运行
8. 2026年趋势展望
ReAct范式正在快速演进。以下几个方向值得关注:
- 多步预规划(Plan-and-Execute):在执行前先制定完整计划,减少无效工具调用
- 自适应推理深度:模型根据问题复杂度自动调整推理步骤,简单问题快速回答,复杂问题深入分析
- 工具自动生成:Agent不仅能使用工具,还能根据需求自动生成新工具(如AutoGPT的"创建插件"能力)
- 多模态ReAct:将视觉、语音等模态融入推理循环,Agent可以通过截图分析、语音识别等方式获取信息
- 协作式ReAct:多个Agent分工协作,每个Agent专注特定领域,通过消息传递协调任务
总结
ReAct范式之所以成为Agent开发的事实标准,是因为它在简洁性和强大性之间取得了完美平衡。核心循环——思考、行动、观察——模拟了人类解决问题的自然方式。
从生产角度看,一个可靠的ReAct Agent需要关注三个层面:
- 鲁棒性:完善的错误处理、重试机制、输出格式修复
- 效率:动态工具选择、上下文压缩、模型分级
- 安全性:权限隔离、输入验证、执行沙箱
随着大模型能力的持续提升和工具生态的日益丰富,ReAct Agent正在从"能用"走向"好用"。掌握这一范式,是构建下一代AI应用的基础能力。