超聚变数字技术 AI Agent 开发一面

1、讲讲什么是 RAG

RAG 是 Retrieval-Augmented Generation,也就是检索增强生成。它的核心思想不是只靠大模型参数里记住的知识回答问题,而是在回答之前,先去外部知识库里检索和当前问题相关的内容,再把检索结果作为上下文交给模型生成答案。

完整流程一般分两部分。离线侧先做文档接入、清洗、切片、向量化和索引构建,把知识变成可检索的形式。在线侧收到用户问题后,先做 query 改写或纠错,再去知识库召回相关片段,必要时做 rerank 重排,最后把用户问题和证据一起送给模型生成。

RAG 的价值主要有三个:

  • 让模型获得训练时没有的外部知识
  • 让知识更新不依赖重新训练模型
  • 让回答更容易有证据支撑,降低纯生成带来的幻觉

一个最简单的流程可以写成:

def rag_pipeline(query):
    query = rewrite_query(query)
    docs = retrieve(query)
    top_docs = rerank(query, docs)[:5]
    prompt = build_prompt(query, top_docs)
    return llm_generate(prompt)

2、rerank 重排具体使用的是算法还是模型?

rerank 本质上既可以理解为排序算法的一环,也可以具体落到排序模型。实际项目里说 rerank,通常指的是“重排模型”,不是简单按向量分数再排一次序。

召回阶段一般追求高覆盖,所以会先用向量检索、BM25 或混合检索快速取回一批候选文档。这个阶段更关注 recall,不太关注顺序绝对准确。rerank 的作用是对候选集合做更精细的相关性判断,把真正最相关的文档放到前面。常见做法是使用 cross-encoder 或专门的 reranker 模型,对 query 和每个候选文档做联合打分。

所以工程上可以这样理解:

  • 检索阶段:高效捞候选
  • 重排阶段:高精度排序

如果是轻量场景,也有直接用一些启发式规则,比如标题命中、关键词权重、时间衰减、字段加权去做二次排序,但真正效果更稳的通常还是模型式 rerank。

3、讲讲消息队列,讲讲 Kafka

消息队列本质上是系统间异步通信的中间层。生产者把消息写进去,消费者异步处理。它的价值主要在解耦、削峰填谷、异步处理、流量缓冲和失败重试。

Kafka 是一个高吞吐的分布式消息系统,它的核心概念有几个:

  • Producer:负责发送消息
  • Consumer:负责消费消息
  • Topic:消息逻辑分类
  • Partition:Topic 的分区,决定并发和扩展能力
  • Consumer Group:同组内一个分区同一时刻只会被一个消费者消费
  • Offset:消费位点,用来记录消费进度

Kafka 比较适合高吞吐日志流、埋点流、异步任务流、事件驱动链路。它写入性能高,顺序性可以在分区内保证,也支持消息积压和回放。

在 Agent 系统里,Kafka 常见用法包括:

  • 用户异步任务排队
  • 长耗时生成任务解耦
  • 日志和埋点采集
  • 多服务之间的事件通知
  • 失败任务重试和死信队列处理

4、如果 LLM 对话应用要生成非常大的数据集,过程很长,怎样使用消息队列优化用户体验?

这种场景不适合用户请求一直同步阻塞等待,否则前端超时、用户体验差、服务线程也会被长期占用。更合理的做法是把长任务异步化。

典型流程是:前端发起请求后,服务端先快速创建任务,把任务元信息写入数据库,再把任务 ID 和请求参数投递到消息队列。然后立即返回用户一个“任务已提交”的响应。后台消费者异步拉取任务,分片执行生成过程,把中间进度和最终结果持续写回存储。前端通过轮询、WebSocket 或 SSE 查看进度和最终下载地址。

这样的好处是:

  • 用户不需要一直卡在一个长连接里
  • 服务端请求线程能快速释放
  • 后台可以做限流、重试、失败恢复
  • 可以把大任务拆成多个子任务并行处理

一个简单示意代码是:

import uuid
from queue import Queue
from threading import Thread
import time

task_queue = Queue()
task_store = {}

def submit_task(user_input):
    task_id = str(uuid.uuid4())
    task_store[task_id] = {"status": "pending", "progress": 0, "result": None}
    task_queue.put((task_id, user_input))
    return task_id

def worker():
    while True:
        task_id, user_input = task_queue.get()
        task_store[task_id]["status"] = "running"
        data = []
        for i in range(10):
            time.sleep(1)
            data.append(f"{user_input}_part_{i}")
            task_store[task_id]["progress"] = (i + 1) * 10
        task_store[task_id]["status"] = "finished"
        task_store[task_id]["result"] = data
        task_queue.task_done()

Thread(target=worker, daemon=True).start()

线上如果用 Kafka,一般会再配合:

  • 任务状态表
  • 幂等控制
  • 超时取消
  • 分片消费
  • 结果对象存储
  • WebSocket/SSE 进度推送

5、平时都在哪些地方使用过 Docker 技术?

Docker 在线上和开发环境里都很常用。开发阶段主要用于统一环境,避免本地 Python 版本、依赖库版本、系统库不一致导致“我这能跑,你那不能跑”。测试阶段可以快速拉起依赖服务,比如 Redis、MySQL、向量数据库、Kafka。上线阶段则用于模型服务、检索服务、API 服务和异步 worker 的容器化部署。

在 Agent 项目里,比较常见的 Docker 使用场景有:

  • 打包 FastAPI / Flask 推理服务
  • 打包 embedding 服务和 rerank 服务
  • 打包异步 worker
  • 用 docker-compose 拉起 Redis、MySQL、Kafka、Milvus/ES 等依赖
  • 做 CI/CD 发布和版本回滚
  • 保证开发、测试、预发、线上环境一致

一个简单的 Dockerfile 例如:

FROM python:3.10-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .
CMD ["python", "app.py"]

6、讲讲异步编程优化高并发

异步编程的核心不是让单个任务执行更快,而是让线程在等待 IO 的时候不被阻塞,从而提高整体吞吐。高并发场景里很多瓶颈其实是 IO,比如网络请求、数据库查询、缓存访问、调用模型服务、访问对象存储。这些场景非常适合异步。

在 Agent 系统里,异步常见用法包括:

  • 并发调用多个检索源
  • 并发查多个下游服务
  • 并发获取用户画像、权限、记忆、知识库结果
  • 长任务异步执行
  • 结合消息队列做后台消费

Python 里通常会用 asyncioaiohttp、异步数据库客户端等来减少线程阻塞。

import asyncio

async def fetch_redis():
    await asyncio.sleep(1)
    return "redis_data"

async def fetch_db():
    await asyncio.sleep(1)
    return "db_data"

async def main():
    res1, 

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

AI-Agent面试实战专栏 文章被收录于专栏

本专栏聚焦 AI-Agent 面试高频考点,内容来自真实面试与项目实践。系统覆盖大模型基础、Prompt工程、RAG、Agent架构、工具调用、多Agent协作、记忆机制、评测、安全与部署优化等核心模块。以“原理+场景+实战”为主线,提供高频题解析、标准答题思路与工程落地方法,帮助你高效查漏补缺.

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务