Skip to content

性能优化

AI 应用的性能问题和传统 Web 应用不太一样。大多数时候你优化的不是"代码跑得慢",而是"模型响应太慢"——一个你几乎没办法从根本上改变的外部系统。

这页接着 流式输出与 SSE可观测性与日志 来讲,关注 AI 应用特有的性能瓶颈和对应的工程手段。

先定位瓶颈在哪一层

AI 应用的延迟来源大致分四层,对策完全不同:

层次典型表现对策方向
网络往返TTFT(首 token 时间)长地域就近、CDN、连接复用
模型推理输出慢,token 速率低换更快的模型、减少输出 token
上下文过长随对话轮数增加越来越慢prompt 压缩、上下文裁剪
你自己的代码检索慢、数据处理慢异步、缓存、批量化

可观测性工具 记录每次调用的 ttft_mslatency_ms,能帮你快速判断慢在哪一层。

流式输出:感知延迟 vs 实际延迟

先实现流式输出,这是成本最低、效果最显著的优化。

用户感知的"快"主要由 TTFT 决定,不是总耗时。流式输出把 TTFT 从"生成完所有内容"缩短到"生成出第一个 token",通常能让用户感知延迟下降 70-80%。

具体实现参考 流式输出与 SSE,这里不重复。要注意的一点是:如果你在后端拿到完整响应再推给前端,流式就没意义了。确认你的数据流是真正的透传,不是"收完再发"。

缓存策略

语义缓存

对于频繁出现的相似问题,可以缓存响应结果。但 AI 应用的请求不是精确匹配的,同一个意思可能有十种说法,所以传统的键值缓存命中率很低。

语义缓存的思路是:把历史请求转成 embedding,新请求来了先做相似度检索,如果找到语义相似的历史请求,直接返回缓存响应。

python
import numpy as np
from openai import OpenAI

client = OpenAI()

# 简化版语义缓存
class SemanticCache:
    def __init__(self, threshold=0.92):
        self.cache = []  # 存 (embedding, response) 对
        self.threshold = threshold

    def _embed(self, text: str) -> list[float]:
        res = client.embeddings.create(
            input=text,
            model="text-embedding-3-small"
        )
        return res.data[0].embedding

    def _cosine_sim(self, a, b) -> float:
        a, b = np.array(a), np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

    def get(self, query: str):
        query_emb = self._embed(query)
        for emb, response in self.cache:
            if self._cosine_sim(query_emb, emb) > self.threshold:
                return response
        return None

    def set(self, query: str, response: str):
        emb = self._embed(query)
        self.cache.append((emb, response))

语义缓存适合问答类场景(FAQ、客服)。对于创意生成、需要实时数据的场景不适合缓存。

Prompt 结果缓存

OpenAI 的 API 对 prefix 相同的请求有自动 prompt caching,符合条件的请求 token 费用打折,延迟也会降低。如果你的 system prompt 比较长且固定,可以利用这个特性,把固定内容尽量放在消息列表的最前面。

上下文长度控制

随着对话轮数增加,消息历史越来越长,每次请求的 token 数越来越多,费用和延迟都线性增加。

几种常见的上下文控制策略:

固定窗口:只保留最近 N 轮对话。实现简单,但当第 N+1 轮引用了第 1 轮的内容时,AI 会"失忆",体验断层明显。

摘要压缩:当对话超过阈值时,把早期对话压缩成摘要,替换掉原始对话内容。能保留主要信息,但摘要本身有 token 成本,压缩质量也有损耗。

python
def compress_history(messages: list, keep_recent: int = 6) -> list:
    if len(messages) <= keep_recent:
        return messages

    # 把较早的消息压缩成摘要
    to_compress = messages[:-keep_recent]
    recent = messages[-keep_recent:]

    summary_prompt = [
        {"role": "system", "content": "请用简短的中文摘要以下对话,保留关键信息:"},
        *to_compress
    ]
    summary = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=summary_prompt
    ).choices[0].message.content

    return [
        {"role": "system", "content": f"对话历史摘要:{summary}"},
        *recent
    ]

选择性保留:根据当前问题的语义,从历史对话里检索最相关的几轮,而不是全部保留。需要额外的检索步骤,但上下文更精准。

并发与批量化

如果你的 Agent 需要依次调用多个工具或做多次 LLM 请求,先判断这些调用之间有没有依赖关系。有依赖的必须串行,没有依赖的可以并发:

python
import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def parallel_analysis(user_query: str):
    # 这两个分析互相独立,可以并发
    sentiment_task = async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": f"情感分析:{user_query}"}]
    )
    intent_task = async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": f"意图识别:{user_query}"}]
    )

    sentiment, intent = await asyncio.gather(sentiment_task, intent_task)
    return sentiment.choices[0].message.content, intent.choices[0].message.content

串行两次各 1 秒的请求,并发后变成 1 秒,不是 2 秒。

模型选择的性能考量

不同模型的速度差异很大:

  • gpt-4o-miniclaude-haiku 等小模型速度是大模型的 3-5 倍
  • 对于分类、提取、简单问答,小模型通常够用
  • 复杂推理任务才需要大模型

一个常见的做法是任务路由:先用小模型判断任务复杂度,简单任务直接在小模型上完成,复杂任务才升级到大模型。

速度、质量、成本的三角权衡

这三者不能同时最优:

  • 最快的通常不是最准的(小模型 vs 大模型)
  • 最准的通常不是最便宜的(需要多轮调用)
  • 最便宜的通常不是最快的(批量 API 有延迟)

没有"最优方案",只有"符合当前场景约束的方案"。在开始优化之前,先确认你的优先级:对用户来说,延迟、准确率、成本,哪个最重要。


可观测性是性能优化的前提:没有数据,你不知道哪里慢。如果还没有建立 LLM 调用的观测记录,先看 可观测性与日志

面向开发者的 AI 实战路线——Vibe Coding 与 AI 应用开发