超聚变数字技术 AI Agent 开发一面
1、讲讲什么是 RAG
RAG 是 Retrieval-Augmented Generation,也就是检索增强生成。它的核心思想不是只靠大模型参数里记住的知识回答问题,而是在回答之前,先去外部知识库里检索和当前问题相关的内容,再把检索结果作为上下文交给模型生成答案。
完整流程一般分两部分。离线侧先做文档接入、清洗、切片、向量化和索引构建,把知识变成可检索的形式。在线侧收到用户问题后,先做 query 改写或纠错,再去知识库召回相关片段,必要时做 rerank 重排,最后把用户问题和证据一起送给模型生成。
RAG 的价值主要有三个:
- 让模型获得训练时没有的外部知识
- 让知识更新不依赖重新训练模型
- 让回答更容易有证据支撑,降低纯生成带来的幻觉
一个最简单的流程可以写成:
def rag_pipeline(query):
query = rewrite_query(query)
docs = retrieve(query)
top_docs = rerank(query, docs)[:5]
prompt = build_prompt(query, top_docs)
return llm_generate(prompt)
2、rerank 重排具体使用的是算法还是模型?
rerank 本质上既可以理解为排序算法的一环,也可以具体落到排序模型。实际项目里说 rerank,通常指的是“重排模型”,不是简单按向量分数再排一次序。
召回阶段一般追求高覆盖,所以会先用向量检索、BM25 或混合检索快速取回一批候选文档。这个阶段更关注 recall,不太关注顺序绝对准确。rerank 的作用是对候选集合做更精细的相关性判断,把真正最相关的文档放到前面。常见做法是使用 cross-encoder 或专门的 reranker 模型,对 query 和每个候选文档做联合打分。
所以工程上可以这样理解:
- 检索阶段:高效捞候选
- 重排阶段:高精度排序
如果是轻量场景,也有直接用一些启发式规则,比如标题命中、关键词权重、时间衰减、字段加权去做二次排序,但真正效果更稳的通常还是模型式 rerank。
3、讲讲消息队列,讲讲 Kafka
消息队列本质上是系统间异步通信的中间层。生产者把消息写进去,消费者异步处理。它的价值主要在解耦、削峰填谷、异步处理、流量缓冲和失败重试。
Kafka 是一个高吞吐的分布式消息系统,它的核心概念有几个:
- Producer:负责发送消息
- Consumer:负责消费消息
- Topic:消息逻辑分类
- Partition:Topic 的分区,决定并发和扩展能力
- Consumer Group:同组内一个分区同一时刻只会被一个消费者消费
- Offset:消费位点,用来记录消费进度
Kafka 比较适合高吞吐日志流、埋点流、异步任务流、事件驱动链路。它写入性能高,顺序性可以在分区内保证,也支持消息积压和回放。
在 Agent 系统里,Kafka 常见用法包括:
- 用户异步任务排队
- 长耗时生成任务解耦
- 日志和埋点采集
- 多服务之间的事件通知
- 失败任务重试和死信队列处理
4、如果 LLM 对话应用要生成非常大的数据集,过程很长,怎样使用消息队列优化用户体验?
这种场景不适合用户请求一直同步阻塞等待,否则前端超时、用户体验差、服务线程也会被长期占用。更合理的做法是把长任务异步化。
典型流程是:前端发起请求后,服务端先快速创建任务,把任务元信息写入数据库,再把任务 ID 和请求参数投递到消息队列。然后立即返回用户一个“任务已提交”的响应。后台消费者异步拉取任务,分片执行生成过程,把中间进度和最终结果持续写回存储。前端通过轮询、WebSocket 或 SSE 查看进度和最终下载地址。
这样的好处是:
- 用户不需要一直卡在一个长连接里
- 服务端请求线程能快速释放
- 后台可以做限流、重试、失败恢复
- 可以把大任务拆成多个子任务并行处理
一个简单示意代码是:
import uuid
from queue import Queue
from threading import Thread
import time
task_queue = Queue()
task_store = {}
def submit_task(user_input):
task_id = str(uuid.uuid4())
task_store[task_id] = {"status": "pending", "progress": 0, "result": None}
task_queue.put((task_id, user_input))
return task_id
def worker():
while True:
task_id, user_input = task_queue.get()
task_store[task_id]["status"] = "running"
data = []
for i in range(10):
time.sleep(1)
data.append(f"{user_input}_part_{i}")
task_store[task_id]["progress"] = (i + 1) * 10
task_store[task_id]["status"] = "finished"
task_store[task_id]["result"] = data
task_queue.task_done()
Thread(target=worker, daemon=True).start()
线上如果用 Kafka,一般会再配合:
- 任务状态表
- 幂等控制
- 超时取消
- 分片消费
- 结果对象存储
- WebSocket/SSE 进度推送
5、平时都在哪些地方使用过 Docker 技术?
Docker 在线上和开发环境里都很常用。开发阶段主要用于统一环境,避免本地 Python 版本、依赖库版本、系统库不一致导致“我这能跑,你那不能跑”。测试阶段可以快速拉起依赖服务,比如 Redis、MySQL、向量数据库、Kafka。上线阶段则用于模型服务、检索服务、API 服务和异步 worker 的容器化部署。
在 Agent 项目里,比较常见的 Docker 使用场景有:
- 打包 FastAPI / Flask 推理服务
- 打包 embedding 服务和 rerank 服务
- 打包异步 worker
- 用 docker-compose 拉起 Redis、MySQL、Kafka、Milvus/ES 等依赖
- 做 CI/CD 发布和版本回滚
- 保证开发、测试、预发、线上环境一致
一个简单的 Dockerfile 例如:
FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY . . CMD ["python", "app.py"]
6、讲讲异步编程优化高并发
异步编程的核心不是让单个任务执行更快,而是让线程在等待 IO 的时候不被阻塞,从而提高整体吞吐。高并发场景里很多瓶颈其实是 IO,比如网络请求、数据库查询、缓存访问、调用模型服务、访问对象存储。这些场景非常适合异步。
在 Agent 系统里,异步常见用法包括:
- 并发调用多个检索源
- 并发查多个下游服务
- 并发获取用户画像、权限、记忆、知识库结果
- 长任务异步执行
- 结合消息队列做后台消费
Python 里通常会用 asyncio、aiohttp、异步数据库客户端等来减少线程阻塞。
import asyncio
async def fetch_redis():
await asyncio.sleep(1)
return "redis_data"
async def fetch_db():
await asyncio.sleep(1)
return "db_data"
async def main():
res1,
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
本专栏聚焦 AI-Agent 面试高频考点,内容来自真实面试与项目实践。系统覆盖大模型基础、Prompt工程、RAG、Agent架构、工具调用、多Agent协作、记忆机制、评测、安全与部署优化等核心模块。以“原理+场景+实战”为主线,提供高频题解析、标准答题思路与工程落地方法,帮助你高效查漏补缺.
