agent意图识别,连续几个子任务,大模型返回很慢

TwoAdmin 2025-10-4 78 10/4

针对大模型返回慢的问题,这里有几个优化方案:

方案1: 缓存优化(推荐)

python
import time
from functools import lru_cache
from typing import Dict, Any

class IntentCache:
    """意图识别缓存"""
    
    def __init__(self, max_size=1000, ttl=300):  # 5分钟缓存
        self.cache = {}
        self.max_size = max_size
        self.ttl = ttl
    
    def get_key(self, user_input: str) -> str:
        """生成缓存键"""
        # 简单归一化:转小写、去空格、截取前100字符
        normalized = user_input.lower().strip().replace(' ', '')[:100]
        return f"intent:{normalized}"
    
    def get(self, user_input: str) -> Dict[str, Any]:
        """获取缓存"""
        key = self.get_key(user_input)
        if key in self.cache:
            data, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl:
                return data
            else:
                del self.cache[key]
        return None
    
    def set(self, user_input: str, result: Dict[str, Any]):
        """设置缓存"""
        key = self.get_key(user_input)
        if len(self.cache) >= self.max_size:
            # 简单的LRU策略:删除最旧的
            oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k][1])
            del self.cache[oldest_key]
        self.cache[key] = (result, time.time())

# 全局缓存实例
intent_cache = IntentCache()

def cached_intent_recognition(user_input: str) -> Dict[str, Any]:
    """带缓存的意图识别"""
    # 先查缓存
    cached_result = intent_cache.get(user_input)
    if cached_result:
        logging.info(f"缓存命中: {user_input[:50]}...")
        return cached_result
    
    # 缓存未命中,调用大模型
    result = call_llm_for_intent(user_input)
    
    # 存入缓存
    if result:
        intent_cache.set(user_input, result)
    
    return result

方案2: 批量处理

python
import asyncio
from concurrent.futures import ThreadPoolExecutor

class BatchIntentProcessor:
    """批量意图处理器"""
    
    def __init__(self, batch_size=5, max_workers=2):
        self.batch_size = batch_size
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.pending_requests = []
        self.batch_timer = None
    
    async def process_intent_batch(self, user_inputs: List[str]) -> List[Dict[str, Any]]:
        """批量处理意图识别"""
        if len(user_inputs) == 1:
            # 单条请求直接处理
            return [await self._single_intent_recognition(user_inputs[0])]
        
        # 批量请求
        prompt = self._build_batch_prompt(user_inputs)
        batch_result = await self._call_llm_batch(prompt)
        return self._parse_batch_result(batch_result, len(user_inputs))
    
    def _build_batch_prompt(self, user_inputs: List[str]) -> str:
        """构建批量提示词"""
        batch_prompt = """请同时分析以下多个用户的意图,按顺序返回JSON数组:

用户输入列表:
"""
        for i, user_input in enumerate(user_inputs, 1):
            batch_prompt += f"{i}. {user_input}\n"
        
        batch_prompt += """
请返回格式:
[
  {"intent": "类型1", "confidence": 0.9, "entities": [...]},
  {"intent": "类型2", "confidence": 0.8, "entities": [...]},
  ...
]"""
        return batch_prompt
    
    async def queue_intent_request(self, user_input: str) -> asyncio.Future:
        """队列化意图请求"""
        future = asyncio.get_event_loop().create_future()
        self.pending_requests.append((user_input, future))
        
        # 达到批量大小或超时触发处理
        if len(self.pending_requests) >= self.batch_size:
            await self._process_pending_batch()
        elif not self.batch_timer:
            self.batch_timer = asyncio.create_task(self._process_after_timeout())
        
        return future
    
    async def _process_after_timeout(self, timeout=0.5):
        """超时处理"""
        await asyncio.sleep(timeout)
        await self._process_pending_batch()
        self.batch_timer = None
    
    async def _process_pending_batch(self):
        """处理待处理批次"""
        if not self.pending_requests:
            return
        
        inputs = [req[0] for req in self.pending_requests]
        futures = [req[1] for req in self.pending_requests]
        
        try:
            results = await self.process_intent_batch(inputs)
            for future, result in zip(futures, results):
                future.set_result(result)
        except Exception as e:
            for future in futures:
                future.set_exception(e)
        
        self.pending_requests.clear()

方案3: 简化模型和提示词

python
def optimize_intent_prompt(user_input: str) -> str:
    """优化后的意图识别提示词"""
    return f"""请快速判断用户意图,只返回JSON:

用户输入: "{user_input}"

可选意图类型: ["舆情查询", "报告生成", "数据统计", "其他"]

返回格式:
{{"intent": "舆情查询", "confidence": 0.95, "keywords": ["关键词1", "关键词2"]}}

请快速响应,不要解释。"""

def lightweight_intent_recognition(user_input: str) -> Dict[str, Any]:
    """轻量级意图识别"""
    # 1. 规则匹配(快速路径)
    rule_based_result = rule_based_intent_detection(user_input)
    if rule_based_result and rule_based_result.get('confidence', 0) > 0.9:
        return rule_based_result
    
    # 2. 缓存检查
    cached_result = intent_cache.get(user_input)
    if cached_result:
        return cached_result
    
    # 3. 调用优化后的大模型
    prompt = optimize_intent_prompt(user_input)
    result = call_fast_llm(prompt)
    
    if result:
        intent_cache.set(user_input, result)
    
    return result

def rule_based_intent_detection(user_input: str) -> Dict[str, Any]:
    """基于规则的意图识别"""
    user_input_lower = user_input.lower()
    
    # 舆情相关关键词
    sentiment_keywords = ['舆情', '新闻', '报道', '媒体', '热搜', '话题', '舆论']
    report_keywords = ['报告', '统计', '分析', '汇总', '总结']
    
    sentiment_count = sum(1 for kw in sentiment_keywords if kw in user_input_lower)
    report_count = sum(1 for kw in report_keywords if kw in user_input_lower)
    
    if sentiment_count > 0:
        return {
            "intent": "舆情查询",
            "confidence": min(0.7 + sentiment_count * 0.1, 0.95),
            "keywords": extract_keywords(user_input)
        }
    elif report_count > 0:
        return {
            "intent": "报告生成", 
            "confidence": min(0.6 + report_count * 0.1, 0.9),
            "keywords": extract_keywords(user_input)
        }
    
    return None

方案4: 异步处理和超时控制

python
import asyncio
from async_timeout import timeout

async def async_intent_recognition(user_input: str, max_wait: float = 3.0) -> Dict[str, Any]:
    """异步意图识别,带超时控制"""
    try:
        async with timeout(max_wait):
            # 先尝试快速规则匹配
            fast_result = rule_based_intent_detection(user_input)
            if fast_result and fast_result.get('confidence', 0) > 0.8:
                return fast_result
            
            # 再尝试缓存
            cached_result = intent_cache.get(user_input)
            if cached_result:
                return cached_result
            
            # 最后调用大模型(异步)
            result = await call_llm_async(user_input)
            if result:
                intent_cache.set(user_input, result)
            return result or {"intent": "其他", "confidence": 0.5, "keywords": []}
            
    except asyncio.TimeoutError:
        logging.warning(f"意图识别超时: {user_input[:50]}...")
        # 返回兜底结果
        return {
            "intent": "舆情查询",  # 默认舆情查询
            "confidence": 0.6,
            "keywords": extract_keywords(user_input),
            "timeout": True
        }

方案5: 分级处理策略

python
class HierarchicalIntentProcessor:
    """分级意图处理器"""
    
    def __init__(self):
        self.fast_llm_endpoint = "fast-llm"  # 快速但能力弱的模型
        self.slow_llm_endpoint = "slow-llm"  # 慢速但能力强的模型
    
    async def process_intent(self, user_input: str) -> Dict[str, Any]:
        """分级处理意图识别"""
        # 第1级:规则匹配(最快)
        rule_result = rule_based_intent_detection(user_input)
        if rule_result and rule_result['confidence'] > 0.9:
            return rule_result
        
        # 第2级:缓存(很快)
        cached_result = intent_cache.get(user_input)
        if cached_result:
            return cached_result
        
        # 第3级:快速模型(中等速度)
        try:
            async with timeout(2.0):
                fast_result = await self._call_fast_llm(user_input)
                if fast_result and fast_result.get('confidence', 0) > 0.7:
                    intent_cache.set(user_input, fast_result)
                    return fast_result
        except asyncio.TimeoutError:
            pass
        
        # 第4级:完整模型(慢速,但准确)
        full_result = await self._call_slow_llm(user_input)
        if full_result:
            intent_cache.set(user_input, full_result)
        
        return full_result or {
            "intent": "其他",
            "confidence": 0.5,
            "keywords": extract_keywords(user_input)
        }

方案6: 预加载和预热

python
class IntentPreloader:
    """意图预加载器"""
    
    def __init__(self):
        self.common_queries = [
            "查看舆情", "搜索新闻", "生成报告", "统计数据",
            "昨天舆情", "上周报告", "热点话题", "负面新闻"
        ]
        self.preloaded = False
    
    async def preload_common_intents(self):
        """预加载常见查询的意图"""
        if self.preloaded:
            return
        
        logging.info("开始预加载常见意图...")
        tasks = []
        for query in self.common_queries:
            task = asyncio.create_task(self._preload_single_intent(query))
            tasks.append(task)
        
        # 并行预加载
        await asyncio.gather(*tasks, return_exceptions=True)
        self.preloaded = True
        logging.info("常见意图预加载完成")
    
    async def _preload_single_intent(self, query: str):
        """预加载单个意图"""
        try:
            result = await async_intent_recognition(query)
            intent_cache.set(query, result)
        except Exception as e:
            logging.warning(f"预加载失败 {query}: {e}")

使用示例

python
# 在应用启动时预加载
@app.on_event("startup")
async def startup_event():
    preloader = IntentPreloader()
    await preloader.preload_common_intents()

# 在路由中使用优化后的意图识别
@app.post("/chat")
async def chat_endpoint(request: ChatRequest):
    start_time = time.time()
    
    # 使用优化后的意图识别
    intent_result = await async_intent_recognition(request.user_input)
    
    processing_time = time.time() - start_time
    logging.info(f"意图识别耗时: {processing_time:.2f}s")
    
    # 根据意图路由到不同处理逻辑
    if intent_result["intent"] == "舆情查询":
        return await handle_sentiment_query(request, intent_result)
    elif intent_result["intent"] == "报告生成":
        return await handle_report_generation(request, intent_result)
    else:
        return await handle_other_query(request, intent_result)

推荐组合方案

生产环境推荐使用:

  1. 缓存 + 规则匹配 + 异步处理 + 超时控制

  2. 预加载常见查询

  3. 分级处理策略

这样可以确保:

  • 常见请求秒级响应(缓存+规则)

  • 新请求在可接受时间内完成(3秒内)

  • 系统在高压下仍能稳定运行(超时兜底)

- THE END -

TwoAdmin

11月21日19:56

最后修改:2025年11月21日
0

非特殊说明,本博所有文章均为博主原创。