构建生产级 ReAct Agent:从推理循环到工具编排的实战指南

10次阅读
没有评论

构建生产级 ReAct Agent:从推理循环到工具编排的实战指南

在大模型应用开发中,ReAct(Reasoning + Acting)已经成为构建智能Agent的事实标准范式。不同于简单的聊天补全,ReAct让模型能够在推理和行动之间迭代循环——先思考该做什么,再执行工具,观察结果,然后继续思考,直到完成任务。本文将深入剖析ReAct的核心机制,并带你从零构建一个可用于生产环境的ReAct Agent。

1. 为什么需要ReAct?

早期的LLM应用主要依赖链式提示(Chain-of-Thought)来引导模型推理。但纯推理有个致命问题:模型只能输出文本,无法与外部世界交互。当你需要查询数据库、调用API、执行代码或搜索网络时,纯推理就无能为力了。

ReAct的核心洞察是将推理和行动交织在一起:

  • Reasoning:模型分析当前状态,规划下一步行动
  • Acting:模型调用工具获取外部信息或执行操作
  • Observing:模型接收工具返回结果,纳入上下文
  • 循环:重复上述过程,直到得出最终答案

这种范式使得LLM从”问答机器”进化为”问题解决者”——它不仅能回答已知知识,还能通过工具调用获取未知信息、验证假设、执行多步操作。

2. ReAct 核心循环的数学本质

从控制论角度看,ReAct循环本质上是一个带反馈的马尔可夫决策过程。每一步的状态由历史轨迹决定:

状态 S_t = (Q, A_0, O_0, A_1, O_1, ..., A_{t-1}, O_{t-1})
动作 A_t = π(S_t)  →  选择工具调用或输出答案
观察 O_t = E(A_t)   →  执行工具获得结果

其中Q是用户初始问题,π是策略(即LLM本身),E是环境(工具执行器)。循环终止条件是模型输出最终答案而非工具调用。

3. 构建基础ReAct Agent

让我们从一个最小可用的ReAct实现开始,然后逐步增强到生产级:

import json
import re
from typing import Callable

class ReActAgent:
    def __init__(self, llm_call: Callable, tools: dict, max_steps: int = 10):
        self.llm_call = llm_call
        self.tools = tools
        self.max_steps = max_steps
        self.system_prompt = """你是一个智能助手,可以使用工具来解决问题。

可用工具:
{tool_descriptions}

请按照以下格式思考:
Thought: 分析当前情况,决定下一步行动
Action: 工具名称
Action Input: 工具输入参数(JSON格式)

或者当你有最终答案时:
Thought: 我已经得到了足够的信息
Final Answer: 你的回答

开始吧!"""
    
    def _build_tool_descriptions(self) -> str:
        descs = []
        for name, tool in self.tools.items():
            descs.append(f"- {name}: {tool['description']}")
            if 'params' in tool:
                for param in tool['params']:
                    descs.append(f"  • {param['name']}: {param['description']}")
        return '\n'.join(descs)
    
    def _parse_action(self, text: str) -> dict:
        """解析模型的Thought/Action/Observation输出"""
        thought_match = re.search(r'Thought:\s*(.+?)(?=Action:|Final Answer:|$)', 
                                   text, re.DOTALL)
        action_match = re.search(r'Action:\s*(\w+)', text)
        action_input_match = re.search(r'Action Input:\s*({.*?})', text, re.DOTALL)
        final_match = re.search(r'Final Answer:\s*(.+)', text, re.DOTALL)
        
        result = {'thought': thought_match.group(1).strip() if thought_match else ''}
        
        if final_match:
            result['type'] = 'final'
            result['answer'] = final_match.group(1).strip()
        elif action_match:
            result['type'] = 'action'
            result['action'] = action_match.group(1).strip()
            if action_input_match:
                result['action_input'] = json.loads(action_input_match.group(1))
            else:
                result['action_input'] = {}
        
        return result
    
    def run(self, question: str) -> str:
        history = []
        
        for step in range(self.max_steps):
            # 构建当前prompt
            prompt = self.system_prompt.format(
                tool_descriptions=self._build_tool_descriptions()
            )
            prompt += f"\n\nQuestion: {question}\n\n"
            
            if history:
                prompt += "之前的思考过程:\n"
                for h in history:
                    prompt += f"{h}\n"
                prompt += "\n"
            
            # 调用LLM
            response = self.llm_call(prompt)
            parsed = self._parse_action(response)
            
            print(f"Step {step + 1} - Thought: {parsed['thought']}")
            
            if parsed['type'] == 'final':
                return parsed['answer']
            
            # 执行工具
            tool_name = parsed['action']
            if tool_name not in self.tools:
                obs = f"错误:工具 '{tool_name}' 不存在"
            else:
                try:
                    obs = self.tools[tool_name]['function'](**parsed.get('action_input', {}))
                except Exception as e:
                    obs = f"工具执行错误:{str(e)}"
            
            print(f"Step {step + 1} - Action: {tool_name} → Observation: {obs}")
            
            history.append(
                f"Thought: {parsed['thought']}\n"
                f"Action: {tool_name}\n"
                f"Action Input: {json.dumps(parsed.get('action_input', {}))}\n"
                f"Observation: {obs}"
            )
        
        return f"达到最大步数限制({self.max_steps}步),未能完成回答"

4. 生产级增强:错误处理与重试机制

基础版本在实际使用中存在多个脆弱点:JSON解析失败、工具调用超时、模型输出格式偏离等。生产级Agent需要完善的容错机制:

import time
import traceback
from dataclasses import dataclass, field
from enum import Enum

class ToolStatus(Enum):
    SUCCESS = "success"
    ERROR = "error"
    TIMEOUT = "timeout"
    VALIDATION_ERROR = "validation_error"

@dataclass
class ToolResult:
    status: ToolStatus
    output: str
    duration_ms: float = 0
    retries: int = 0
    error_detail: str = ""

@dataclass  
class AgentTrace:
    """完整的执行轨迹,用于调试和审计"""
    question: str = ""
    steps: list = field(default_factory=list)
    total_duration_ms: float = 0
    tool_calls: int = 0
    errors: int = 0
    success: bool = False

class ProductionReActAgent:
    def __init__(self, llm_call, tools: dict, max_steps: int = 15,
                 max_retries: int = 2, tool_timeout: float = 30.0):
        self.llm_call = llm_call
        self.tools = tools
        self.max_steps = max_steps
        self.max_retries = max_retries
        self.tool_timeout = tool_timeout
    
    def _validate_action_input(self, tool_name: str, inputs: dict) -> tuple:
        """验证工具输入参数"""
        if tool_name not in self.tools:
            return False, f"未知工具: {tool_name}"
        
        tool = self.tools[tool_name]
        required = tool.get('required_params', [])
        for param in required:
            if param not in inputs:
                return False, f"缺少必需参数: {param}"
        
        return True, ""
    
    def _execute_tool_with_retry(self, tool_name: str, inputs: dict) -> ToolResult:
        """带重试和超时保护的工具执行"""
        start = time.time()
        last_error = ""
        
        for attempt in range(self.max_retries + 1):
            try:
                # 参数验证
                valid, error_msg = self._validate_action_input(tool_name, inputs)
                if not valid:
                    return ToolResult(
                        status=ToolStatus.VALIDATION_ERROR,
                        output=error_msg,
                        error_detail=error_msg
                    )
                
                # 执行工具(带超时)
                func = self.tools[tool_name]['function']
                result = func(**inputs)
                
                duration = (time.time() - start) * 1000
                return ToolResult(
                    status=ToolStatus.SUCCESS,
                    output=str(result),
                    duration_ms=duration,
                    retries=attempt
                )
                
            except Exception as e:
                last_error = f"{type(e).__name__}: {str(e)}"
                if attempt < self.max_retries:
                    wait = 2 ** attempt  # 指数退避
                    time.sleep(wait)
        
        return ToolResult(
            status=ToolStatus.ERROR,
            output=f"工具执行失败(已重试{self.max_retries}次): {last_error}",
            duration_ms=(time.time() - start) * 1000,
            retries=self.max_retries,
            error_detail=traceback.format_exc()
        )
    
    def _repair_malformed_output(self, raw_output: str) -> dict:
        """尝试修复模型输出的格式偏差"""
        # 尝试提取任何可能的JSON action input
        json_match = re.search(r'\{[^{}]*"[^"]+"\s*:\s*[^}]+\}', raw_output)
        
        # 检查是否有明确的答案指示
        answer_patterns = [
            r'(?:因此|所以|综上|最终|答案是)[::]\s*(.+)',
            r'Final\s*Answer[::]\s*(.+)',
            r'因此答案[::]\s*(.+)'
        ]
        for pattern in answer_patterns:
            match = re.search(pattern, raw_output, re.IGNORECASE | re.DOTALL)
            if match:
                return {'type': 'final', 'thought': raw_output[:200],
                        'answer': match.group(1).strip()}
        
        # 尝试识别工具名称
        for tool_name in self.tools:
            if tool_name.lower() in raw_output.lower():
                inputs = {}
                if json_match:
                    try:
                        inputs = json.loads(json_match.group())
                    except json.JSONDecodeError:
                        pass
                return {'type': 'action', 'thought': raw_output[:200],
                        'action': tool_name, 'action_input': inputs}
        
        return None  # 无法修复

5. 高级特性:动态工具注册与上下文压缩

在真实场景中,Agent可能需要使用数十个工具,将所有工具描述塞入每次请求会快速耗尽上下文窗口。我们需要动态工具管理和上下文压缩:

class SmartToolRegistry:
    """智能工具注册表:按需加载 + 使用频率统计"""
    
    def __init__(self):
        self._tools = {}
        self._usage_count = {}
        self._last_used = {}
    
    def register(self, name: str, description: str, function: Callable,
                 required_params: list = None, category: str = "general"):
        self._tools[name] = {
            'description': description,
            'function': function,
            'required_params': required_params or [],
            'category': category
        }
        self._usage_count[name] = 0
    
    def get_relevant_tools(self, question: str, max_tools: int = 5) -> dict:
        """根据问题语义选择最相关的工具"""
        # 简单实现:基于关键词匹配 + 使用频率加权
        scored = []
        question_lower = question.lower()
        
        for name, tool in self._tools.items():
            score = 0
            # 关键词匹配
            keywords = tool['description'].lower().split()
            for kw in keywords:
                if kw in question_lower:
                    score += 2
            # 使用频率加权
            score += self._usage_count.get(name, 0) * 0.5
            scored.append((name, score))
        
        scored.sort(key=lambda x: x[1], reverse=True)
        selected = scored[:max_tools]
        
        return {name: self._tools[name] for name, _ in selected}
    
    def record_usage(self, name: str):
        self._usage_count[name] = self._usage_count.get(name, 0) + 1
        self._last_used[name] = time.time()


class ContextCompressor:
    """智能上下文压缩器:保留关键信息,压缩冗余观察"""
    
    @staticmethod
    def compress_observations(history: list, max_obs_length: int = 200) -> list:
        """压缩历史观察结果"""
        compressed = []
        for entry in history:
            if 'Observation:' in entry:
                parts = entry.split('Observation:')
                obs = parts[1].strip()
                if len(obs) > max_obs_length:
                    obs = obs[:max_obs_length] + '... [已截断]'
                compressed.append(parts[0] + 'Observation: ' + obs)
            else:
                compressed.append(entry)
        return compressed
    
    @staticmethod
    def summarize_early_steps(history: list, threshold: int = 5) -> str:
        """当历史过长时,将早期步骤压缩为摘要"""
        if len(history) <= threshold:
            return '\n'.join(history)
        
        early = history[:len(history) - threshold]
        recent = history[len(history) - threshold:]
        
        summary = f"[前{len(early)}步已压缩] 已执行{len(early)}次工具调用。\n"
        summary += "最近的思考过程:\n"
        summary += '\n'.join(recent)
        
        return summary

6. 完整的生产级Agent编排

将以上组件组装为一个完整的、可直接部署的ReAct Agent:

class ProductionAgent:
    """生产级ReAct Agent"""
    
    def __init__(self, llm_client, tool_registry: SmartToolRegistry):
        self.llm = llm_client
        self.registry = tool_registry
        self.compressor = ContextCompressor()
    
    def solve(self, question: str, verbose: bool = True) -> AgentTrace:
        trace = AgentTrace(question=question)
        start_time = time.time()
        
        # 动态选择相关工具
        relevant_tools = self.registry.get_relevant_tools(question)
        history = []
        
        for step in range(15):
            # 构建prompt
            prompt = self._build_prompt(question, relevant_tools, history)
            
            # LLM推理
            try:
                response = self.llm.complete(prompt)
            except Exception as e:
                trace.errors += 1
                if verbose:
                    print(f"[Step {step+1}] LLM调用失败: {e}")
                continue
            
            # 解析动作
            action = self._parse_action(response)
            
            if action is None:
                # 尝试修复
                action = self._repair_malformed_output(response)
            
            if action is None:
                if verbose:
                    print(f"[Step {step+1}] 无法解析输出,跳过")
                trace.errors += 1
                continue
            
            if verbose:
                print(f"[Step {step+1}] 💭 {action.get('thought', '')[:100]}")
            
            if action['type'] == 'final':
                trace.success = True
                trace.total_duration_ms = (time.time() - start_time) * 1000
                trace.steps.append({
                    'step': step + 1,
                    'type': 'final',
                    'answer': action['answer']
                })
                if verbose:
                    print(f"\n✅ 最终答案: {action['answer'][:200]}")
                return trace
            
            # 执行工具
            tool_name = action['action']
            result = self._execute_tool(tool_name, action.get('action_input', {}))
            trace.tool_calls += 1
            
            if result.status != ToolStatus.SUCCESS:
                trace.errors += 1
            
            self.registry.record_usage(tool_name)
            
            if verbose:
                status_emoji = "✅" if result.status == ToolStatus.SUCCESS else "❌"
                print(f"[Step {step+1}] {status_emoji} {tool_name}: {result.output[:100]}")
            
            # 记录步骤
            history.append(
                f"Thought: {action['thought']}\n"
                f"Action: {tool_name}\n"
                f"Action Input: {json.dumps(action.get('action_input', {}))}\n"
                f"Observation: {result.output}"
            )
            trace.steps.append({
                'step': step + 1,
                'type': 'action',
                'tool': tool_name,
                'result': result.status.value
            })
            
            # 上下文压缩
            if len(history) > 5:
                history = [self.compressor.summarize_early_steps(history)]
        
        trace.total_duration_ms = (time.time() - start_time) * 1000
        return trace
    
    def _build_prompt(self, question, tools, history):
        tool_desc = "\n".join(
            f"- {name}: {t['description']}"
            for name, t in tools.items()
        )
        
        SYSTEM = f"""你是一个能使用工具解决问题的智能Agent。

可用工具:
{tool_desc}

格式要求:
Thought: [你的推理过程]
Action: [工具名]
Action Input: [JSON参数]

或完成时:
Thought: [总结推理]
Final Answer: [最终答案]"""
        
        prompt = f"{SYSTEM}\n\nQuestion: {question}"
        if history:
            prompt += f"\n\n执行历史:\n{history[-1]}"
        
        return prompt
    
    def _parse_action(self, text):
        # 解析逻辑(同前,略)
        pass
    
    def _repair_malformed_output(self, text):
        # 修复逻辑(同前,略)
        pass
    
    def _execute_tool(self, name, inputs):
        # 执行逻辑(同前,略)
        pass

7. 性能优化与最佳实践

在生产部署中,以下几个关键点决定了ReAct Agent的可靠性:

7.1 提示工程要点

  • 少样本示例:在system prompt中包含2-3个完整的Thought/Action/Observation示例,能显著提升格式遵循率
  • 工具描述质量:工具描述应包含使用场景、参数含义、返回值格式,避免模糊描述
  • 错误恢复指令:明确告知模型"如果工具返回错误,请分析原因并尝试其他方法"

7.2 成本控制策略

  • 模型分级:简单步骤使用小模型(如GPT-4o-mini),复杂推理使用大模型
  • 工具调用预算:设置单任务最大token消耗,超限后强制输出当前最佳答案
  • 缓存:对相同工具调用结果进行缓存,避免重复执行

7.3 安全考量

  • 工具权限隔离:危险操作(文件删除、数据库写入)需要二次确认
  • 输入消毒:对模型生成的工具参数进行验证和转义,防止注入攻击
  • 执行沙箱:代码执行类工具应在隔离环境中运行

8. 2026年趋势展望

ReAct范式正在快速演进。以下几个方向值得关注:

  • 多步预规划(Plan-and-Execute):在执行前先制定完整计划,减少无效工具调用
  • 自适应推理深度:模型根据问题复杂度自动调整推理步骤,简单问题快速回答,复杂问题深入分析
  • 工具自动生成:Agent不仅能使用工具,还能根据需求自动生成新工具(如AutoGPT的"创建插件"能力)
  • 多模态ReAct:将视觉、语音等模态融入推理循环,Agent可以通过截图分析、语音识别等方式获取信息
  • 协作式ReAct:多个Agent分工协作,每个Agent专注特定领域,通过消息传递协调任务

总结

ReAct范式之所以成为Agent开发的事实标准,是因为它在简洁性和强大性之间取得了完美平衡。核心循环——思考、行动、观察——模拟了人类解决问题的自然方式。

从生产角度看,一个可靠的ReAct Agent需要关注三个层面:

  1. 鲁棒性:完善的错误处理、重试机制、输出格式修复
  2. 效率:动态工具选择、上下文压缩、模型分级
  3. 安全性:权限隔离、输入验证、执行沙箱

随着大模型能力的持续提升和工具生态的日益丰富,ReAct Agent正在从"能用"走向"好用"。掌握这一范式,是构建下一代AI应用的基础能力。

正文完
 0
评论(没有评论)