RAG 架构进阶:从朴素检索到生产级智能检索系统

37次阅读
没有评论

RAG 架构进阶:从朴素检索到生产级智能检索系统

检索增强生成(Retrieval-Augmented Generation,RAG)已经成为大模型落地应用的核心架构范式。然而,大多数开发者的 RAG 实现停留在”嵌入+向量检索”的朴素阶段,面对生产环境中的复杂查询、多模态数据和实时性要求时往往捉襟见肘。本文将深入探讨 RAG 架构的进阶技术,从查询重写、混合检索到 reranker 精排,再到多路径融合策略,帮助你构建真正生产级的智能检索系统。

一、朴素 RAG 的瓶颈在哪里?

一个典型的朴素 RAG 流程是:用户提问 → 问题嵌入 → 向量相似度检索 → 拼接上下文 → 大模型生成答案。这个流程看起来简洁优雅,但在实际应用中存在明显的性能天花板:

  • 语义鸿沟:用户的提问方式千变万化,直接嵌入往往无法精准匹配文档语义。例如用户问”这家公司的盈利情况怎样?”,文档中可能写的是”Q3 营收同比增长 23%,净利润率达 15%”。
  • 关键词失效:向量检索擅长语义匹配,但在处理专有名词、型号编号、精确术语时,传统 BM25 关键词检索反而更可靠。
  • 上下文窗口浪费:Top-K 检索返回的结果中,可能只有 2-3 条真正相关,其余都是噪音,直接浪费宝贵的上下文窗口。
  • 多跳推理缺失:复杂问题需要多步检索和推理,单次检索无法覆盖所有必要信息。

二、查询重写:让问题找到答案

查询重写(Query Rewriting)是提升 RAG 召回率的第一道关卡。核心思路是:在检索之前,先用大模型将用户原始问题转化为更适合检索的形式。

from openai import OpenAI
import json

client = OpenAI()

def rewrite_query(original_query: str, context: str = "") -> dict:
    """
    将用户原始查询改写为多种检索形式:
    1. 语义扩展版:补充同义词和相关概念
    2. 关键词提取版:提取核心术语用于 BM25
    3. 假设答案版:生成假设性答案用于 HyDE 检索
    """
    system_prompt = """你是一个查询优化专家。给定用户问题,生成三种检索变体:
1. semantic: 语义扩展版本,补充同义词和相关概念
2. keywords: 提取核心关键词列表,用空格连接
3. hyde: 生成一段假设性答案(Hypothetical Document Embeddings)

返回 JSON 格式。"""

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": f"原始问题: {original_query}\n对话历史: {context}"}
        ],
        response_format={"type": "json_object"},
        temperature=0.3
    )
    
    return json.loads(response.choices[0].message.content)

# 示例
result = rewrite_query("Transformer 模型怎么优化推理速度?")
print(json.dumps(result, ensure_ascii=False, indent=2))
# 输出示例:
# {
#   "semantic": "Transformer 模型推理加速 优化技术 减少延迟 提高吞吐量",
#   "keywords": "Transformer inference optimization KV-cache quantization",
#   "hyde": "Transformer 推理优化常用方法包括:KV Cache 缓存、模型量化(INT8/FP16)、
#            投机解码(Speculative Decoding)、算子融合、Flash Attention 等..."
# }

HyDE(Hypothetical Document Embeddings)是一种特别巧妙的技术:它让大模型先生成一个假设性答案,然后用这个假设答案去检索,而不是用原始问题。因为假设答案在语义空间中更接近真实文档,召回质量往往显著提升。

三、混合检索:向量 + BM25 强强联合

生产级 RAG 系统几乎都采用混合检索策略,将向量语义检索和 BM25 关键词检索的结果进行融合。这不仅能同时捕获语义相似性和精确术语匹配,还能在对方失效时互为兜底。


import numpy as np
from rank_bm25 import BM25Okapi
from typing import List, Dict, Tuple

class HybridRetriever:
    def __init__(self, embedding_model, documents: List[str]):
        self.documents = documents
        self.embedding_model = embedding_model
        
        # BM25 索引
        tokenized = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized)
        
        # 向量索引(预计算)
        self.doc_embeddings = embedding_model.encode(documents)
    
    def vector_search(self, query: str, top_k: int) -> List[Tuple[int, float]]:
        query_emb = self.embedding_model.encode([query])[0]
        scores = np.dot(self.doc_embeddings, query_emb)
        top_indices = np.argsort(scores)[::-1][:top_k]
        return [(int(i), float(scores[i])) for i in top_indices]
    
    def bm25_search(self, query: str, top_k: int) -> List[Tuple[int, float]]:
        tokenized_query = query.lower().split()
        scores = self.bm25.get_scores(tokenized_query)
        top_indices = np.argsort(scores)[::-1][:top_k]
        return [(int(i), float(scores[i])) for i in top_indices]
    
    def hybrid_search(
        self, 
        query: str, 
        top_k: int = 10,
        vector_weight: float = 0.6,
        bm25_weight: float = 0.4
    ) -> List[Dict]:
        """
        使用倒数排序融合(Reciprocal Rank Fusion)合并两种检索结果
        """
        k = 60  # RRF 常数
        
        # 获取两种检索的排序结果
        vec_results = self.vector_search(query, top_k * 2)
        bm25_results = self.bm25_search(query, top_k * 2)
        
        # 计算 RRF 分数
        rrf_scores = {}
        
        for rank, (doc_idx, _) in enumerate(vec_results):
            rrf_scores[doc_idx] = rrf_scores.get(doc_idx, 0) + \
                                  vector_weight * (1.0 / (k + rank + 1))
        
        for rank, (doc_idx, _) in enumerate(bm25_results):
            rrf_scores[doc_idx] = rrf_scores.get(doc_idx, 0) + \
                                  bm25_weight * (1.0 / (k + rank + 1))
        
        # 按 RRF 分数排序
        sorted_docs = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
        
        return [
            {
                "doc_index": idx,
                "rrf_score": score,
                "content": self.documents[idx][:200] + "..."
            }
            for idx, score in sorted_docs[:top_k]
        ]

# 使用示例
# retriever = HybridRetriever(embedding_model, document_chunks)
# results = retriever.hybrid_search("Transformer 推理优化方法", top_k=8)

倒数排序融合(RRF)是混合检索中的核心算法。它的关键优势在于不需要对原始分数做归一化——因为向量和 BM25 的分数尺度完全不同,直接加权会引入偏差。RRF 只依赖排序位置,天然解决了分数不可比的问题。

四、Reranker 精排:用交叉注意力提升精度

混合检索解决了召回率的问题,但检索阶段使用的是双编码器(Bi-Encoder),每个文档独立编码,无法建模查询和文档之间的细粒度交互。Reranker 阶段引入交叉编码器(Cross-Encoder),对检索结果进行二次精排,是提升最终效果的关键一步。


from sentence_transformers import CrossEncoder
from typing import List

class Reranker:
    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        """
        使用轻量级交叉编码器进行精排
        中文场景推荐: "BAAI/bge-reranker-v2-m3"
        """
        self.model = CrossEncoder(model_name)
    
    def rerank(
        self, 
        query: str, 
        documents: List[str], 
        top_k: int = 5
    ) -> List[dict]:
        """
        对候选文档进行重排序
        
        CrossEncoder 将 query 和 document 拼接后一起输入,
        输出相关性分数,比 Bi-Encoder 的点积相似度更精准
        """
        pairs = [(query, doc) for doc in documents]
        scores = self.model.predict(pairs)
        
        # 按分数降序排列
        ranked = sorted(
            zip(documents, scores), 
            key=lambda x: x[1], 
            reverse=True
        )
        
        return [
            {
                "rank": i + 1,
                "score": float(score),
                "content": doc[:300] + "..." if len(doc) > 300 else doc
            }
            for i, (doc, score) in enumerate(ranked[:top_k])
        ]

# 完整 RAG 检索流水线
class AdvancedRAGPipeline:
    def __init__(self, embedding_model, reranker_model, documents):
        self.retriever = HybridRetriever(embedding_model, documents)
        self.reranker = Reranker(reranker_model)
    
    def search(self, query: str, retrieve_k: int = 20, rerank_k: int = 5):
        # Step 1: 查询重写
        rewritten = rewrite_query(query)
        
        # Step 2: 混合检索(多路召回)
        hybrid_results = self.retriever.hybrid_search(
            f"{rewritten['semantic']} {rewritten['keywords']}", 
            top_k=retrieve_k
        )
        
        # Step 3: Reranker 精排
        candidate_docs = [r["content"] for r in hybrid_results]
        final_results = self.reranker.rerank(
            query,  # 用原始查询做精排,避免改写引入偏差
            candidate_docs, 
            top_k=rerank_k
        )
        
        return final_results

一个常见的工程误区是用改写后的查询做 reranker 精排。实际上,reranker 阶段应该使用用户的原始查询,因为改写可能引入语义偏移,而交叉编码器对原始语义的判断更准确。

五、自适应检索:让系统自己决定检索策略

不同难度的问题需要不同的检索策略。简单的事实查询可能只需要一次检索,而复杂的分析问题需要多轮迭代。自适应检索(Adaptive RAG)让系统根据查询复杂度动态选择检索路径。


from enum import Enum

class QueryComplexity(Enum):
    SIMPLE = "simple"       # 单跳事实查询
    MODERATE = "moderate"   # 需要多文档综合
    COMPLEX = "complex"     # 需要多步推理

class AdaptiveRAG:
    def __init__(self, pipeline: AdvancedRAGPipeline):
        self.pipeline = pipeline
    
    def classify_query(self, query: str) -> QueryComplexity:
        """使用大模型判断查询复杂度"""
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": f"""判断以下问题的复杂度,返回 SIMPLE/MODERATE/COMPLEX:
- SIMPLE: 单一事实查询,一次检索即可回答
- MODERATE: 需要综合多个来源的信息
- COMPLEX: 需要多步推理或比较分析

问题: {query}"""
            }],
            temperature=0
        )
        result = response.choices[0].message.content.strip()
        return QueryComplexity(result.lower())
    
    def execute(self, query: str) -> str:
        complexity = self.classify_query(query)
        
        if complexity == QueryComplexity.SIMPLE:
            # 简单查询:单次检索,top-3
            results = self.pipeline.search(query, retrieve_k=10, rerank_k=3)
            
        elif complexity == QueryComplexity.MODERATE:
            # 中等查询:多子问题检索 + 合并
            sub_queries = self._decompose_query(query)
            all_results = []
            for sq in sub_queries:
                results = self.pipeline.search(sq, retrieve_k=15, rerank_k=5)
                all_results.extend(results)
            # 去重合并
            results = self._deduplicate(all_results)[:6]
            
        else:  # COMPLEX
            # 复杂查询:迭代检索,每轮根据已有信息生成新查询
            results = self._iterative_retrieve(query, max_rounds=3)
        
        # 构建 prompt 并生成答案
        context = "\n\n".join([r["content"] for r in results])
        answer = self._generate_answer(query, context)
        return answer
    
    def _decompose_query(self, query: str) -> List[str]:
        """将复杂问题分解为子问题"""
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": f"将以下问题分解为 2-4 个独立的子问题,每行一个:\n{query}"
            }],
            temperature=0.3
        )
        return [q.strip() for q in response.choices[0].message.content.strip().split("\n") if q.strip()]
    
    def _iterative_retrieve(self, query: str, max_rounds: int) -> List[dict]:
        """迭代检索:每轮根据已有信息决定是否需要继续检索"""
        collected = []
        current_query = query
        
        for round_num in range(max_rounds):
            results = self.pipeline.search(current_query, retrieve_k=15, rerank_k=5)
            collected.extend(results)
            
            # 检查信息是否充分
            context = "\n".join([r["content"] for r in collected])
            sufficiency = self._check_sufficiency(query, context)
            
            if sufficiency == "sufficient":
                break
            else:
                # 生成补充查询
                current_query = self._generate_followup(query, context)
        
        return self._deduplicate(collected)[:8]

六、生产部署的关键考量

将进阶 RAG 系统部署到生产环境,还需要关注以下几个工程要点:

  1. 向量数据库选型:Milvus 适合十亿级大规模场景,Qdrant 在性能和易用性之间平衡良好,pgvector 适合已有 PostgreSQL 基础设施的团队。对于中小规模(百万级以下),Chroma 的本地部署足够使用。
  2. 嵌入模型选择:中文场景推荐 BGE(BAAI/bge-large-zh-v1.5)或 Jina-embeddings-v3。多语言场景可考虑 multilingual-e5-large。嵌入模型的选择直接影响检索天花板,值得花时间做 benchmark。
  3. 缓存策略:对高频查询做语义缓存(Semantic Cache),使用 Redis 存储查询嵌入和对应结果,设置相似度阈值(如 0.95)来判断缓存命中。这能显著降低 API 成本和延迟。
  4. 评估体系:建立离线评估流水线,使用 Ragas 框架计算 Faithfulness、Answer Relevancy、Context Precision 等指标,确保每次迭代都有量化依据。

# 语义缓存示例
import redis
import hashlib

class SemanticCache:
    def __init__(self, redis_client: redis.Redis, threshold: float = 0.95):
        self.redis = redis_client
        self.threshold = threshold
    
    def get(self, query_embedding: np.ndarray) -> str | None:
        """查找语义相似的缓存"""
        # 使用 Redis Vector Similarity Search
        results = self.redis.execute_command(
            "FT.SEARCH", "cache_idx",
            f"*=>[KNN 1 @vec $vec_score AS score]",
            "PARAMS", "2", "vec_score", query_embedding.tobytes(),
            "DIALECT", 2
        )
        if results and float(results[0]) >= self.threshold:
            return results[1]  # 返回缓存的答案
        return None
    
    def set(self, query_embedding: np.ndarray, answer: str, ttl: int = 3600):
        """缓存查询结果"""
        key = f"cache:{hashlib.md5(query_embedding.tobytes()).hexdigest()}"
        self.redis.setex(key, ttl, answer)

七、总结

构建生产级 RAG 系统是一个分层优化的过程:

  • 召回层:查询重写 + 混合检索(向量 + BM25),确保不遗漏相关文档
  • 精排层:Cross-Encoder Reranker,从候选集中筛选最相关的片段
  • 策略层:自适应检索,根据查询复杂度动态调整检索路径
  • 工程层:向量数据库选型、语义缓存、评估体系,保障系统稳定高效运行

朴素 RAG 到生产级 RAG 的演进,本质上是从”能检索”到”检索得好”再到”检索得聪明”的过程。每一步优化都能带来可量化的效果提升。建议团队从混合检索和 Reranker 入手——这两项改进的投入产出比最高,通常能将答案准确率提升 15-25 个百分点。

未来,随着多模态嵌入、图检索(Graph RAG)和长上下文模型的发展,RAG 架构还将持续演进。但无论技术如何变化,”精准召回 → 精细排序 → 智能生成”的核心方法论不会改变。掌握这些进阶技术,你就能在 AI 应用的浪潮中保持领先。

正文完
 0
评论(没有评论)