企业级项目二:基于Doris+LangChain构建数据智能运营AI助手
前言
近一年,大数据面试的风向正在发生明显变化。
过去,面试官更关注:
- 数仓分层怎么设计?
- 指标体系如何构建?
- Spark 调优怎么做的?
而现在,越来越多的面试开始追问:
- 你有没有做过 AI 相关项目?
- LLM 在数据场景中如何落地?
- RAG 是怎么设计的?
AI 不再是「加分项」,而是逐渐成为「基础能力」。
但问题是 很多人所谓的 AI 项目,只停留在:
- 调个 API
- 写个简单问答 Demo
这在企业级场景里,是远远不够的。真正有含金量的 AI 项目,必须要解决真实业务问题,并具备:
- 检索增强(RAG)落地能力
- LLM 语义理解与 Prompt 设计能力
- 工程化架构能力(可扩展、可复用、可维护)
而在我们当前的数据仓库体系中:
- 表数量已达数千级
- 字段与指标数以万计
- 数据答疑占据约 20% 的人力成本
于是,我们决定:用 LLM 重构「数据找数与问数」的方式。
本项目基于 Doris + LangChain + Prompt 工程 + Python + Flask + MySQL,构建了一套企业级数据智能运营助手,系统性解决找数难、问数繁、答数重的问题。
接下来,我会完整拆解它的架构设计、RAG 实现方式、Prompt 优化策略,以及如何把这个项目讲到面试官点头为止。
一、项目背景
本项目涉及到的核心技术栈:Doris + LangChain + Prompt工程 + Python + Flask + MySQL
在当前数据仓库体系中,表数量已达数千级别,字段与指标更是数以万计。业务方在日常工作中难以高效定位所需的数据资产,往往需要依赖数据开发人员协助查找表结构、字段含义或指标口径。据统计,数据团队同学约20%的工作时间用于重复性答疑,不仅严重影响开发效率,也制约了数据服务的响应速度与用户体验。
为系统性解决找数难、问数繁、答数重的问题,亟需构建一个智能化、自助化的数据运营支持工具。本项目旨在通过引入LLM,结合元数据管理与语义理解,打造一个能够精准理解用户意图、自动检索并解释数据资产的智能问答助手,从而显著提升数据发现效率,降低数据开发同学的答疑成本,推动数据服务向自助化、智能化演进。
二、预备知识
环境准备
Doris环境安装
# 安装docker brew install docker # 安装Doris bash start-doris.sh -v 4.0.3 # docker常用命令 Stop cluster: docker-compose -f docker-compose-doris.yaml down Start cluster: docker-compose -f docker-compose-doris.yaml up -d View logs: docker-compose -f docker-compose-doris.yaml logs -f # 启动Doris客户端 mysql -uroot -P9030 -h127.0.0.1
项目依赖包安装
提醒:Python版本>=3.11
pip install langchain_community pip install langchain_chroma pip install langchain_core pip install dashscope pip install flask pip install mysql-connector-python
大模型API-Key获取

技术储备
RAG理论知识
RAG(Retrieval-Augmented Generation)= 检索(Retrieve) + 增强(Augment) + 生成(Generate)
核心思想:先从知识库里找相关资料,再把资料交给大模型生成答案,而不是让模型“凭记忆胡说”。

LangChain基础使用
概述
- LangChain 是一个用于构建 LLM 应用 的框架,核心解决:Prompt 管理、模型调用封装、文档加载、向量检索、RAG 构建、Agent 调度
- 一句话总结:LangChain 是一个把“大模型调用”工程化的工具框架
核心组件
模块 | 作用 |
LLM / ChatModel | 调用大模型 |
PromptTemplate | 管理提示词 |
OutputParser | 解析输出 |
DocumentLoader | 加载文档 |
TextSplitter | 文档切分 |
Embeddings | 向量化 |
VectorStore | 向量存储 |
Retriever | 检索器 |
Chain | 串联流程 |
入门案例
import os
from langchain_community.chat_models.tongyi import ChatTongyi
model = ChatTongyi(
model_name="qwen3-max",
dashscope_api_key=os.getenv("DASHSCOPE_API_KEY") or os.getenv("OPENAI_API_KEY")
)
response = model.invoke("你是谁?")
print(response.content)
Doris ANN索引使用
概述
Apache Doris 中的近似最近邻 (ANN) 索引支持对高维数据进行高效的向量相似性搜索。从 Doris 4.x 开始,通用索引操作语法也支持了 ANN 索引
如何创建
如何使用
三、项目流程
离线流程:xxx
在线流程:xxx

四、项目源码
以下所有代码均本人亲自编写,有任何问题可直接私聊我
SQL代码
创建Doris向量表
CREATE DATABASE doris_document_db;
USE doris_document_db;
CREATE TABLE `doris_vector_table` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`document_name` text NULL,
`content` text NULL,
`embedding` array<float> NOT NULL,
INDEX idx_embedding (`embedding`) USING ANN PROPERTIES(
"dim" = "512",
"index_type" = "hnsw",
"metric_type" = "l2_distance"
)
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
创建用户表及用户对话历史表
创建元数据表
创建文件记录表
前端代码
管理员-admin.html
聊天对话-chat.html
登录-login.html
注册-register.html
后端代码
注意:其中OPENAI_API_KEY、DASHSCOPE_API_KEY配置在环境变量中
项目入口-app.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from flask import Flask, render_template, request, redirect, url_for, session, flash, Response, jsonify
from werkzeug.security import generate_password_hash, check_password_hash
import mysql.connector
from intent_recognition import execute as intent_execute
import json
from functools import wraps
app = Flask(__name__)
app.secret_key = 'your_secret_key' # 替换为实际的密钥
from config import DB_CONFIG_BUSINESS
def get_db_connection():
"""获取数据库连接"""
conn = mysql.connector.connect(**DB_CONFIG_BUSINESS)
return conn
# 登录装饰器
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请先登录')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
@app.route('/')
def index():
if 'user_id' in session:
return redirect(url_for('chat'))
return redirect(url_for('login'))
@app.route('/register', methods=['GET', 'POST'])
def register():
...
@app.route('/login', methods=['GET', 'POST'])
def login():
...
@app.route('/logout')
def logout():
session.clear()
flash('您已登出')
return redirect(url_for('login'))
@app.route('/chat')
@login_required
def chat():
# 获取用户对话历史数量,用于判断是否是首次访问
is_first_visit = True
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT COUNT(*) as count FROM user_conversations WHERE user_id = %s", (session['user_id'],))
result = cursor.fetchone()
is_first_visit = result['count'] == 0 if result else True
except Exception as e:
print(f"获取用户对话历史失败: {str(e)}")
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
# 获取用户名首字母
user_initials = session['username'][0].upper() if session['username'] else 'U'
return render_template('chat.html',
username=session['username'],
is_first_visit=is_first_visit,
user_initials=user_initials)
@app.route('/api/chat', methods=['POST', 'GET'])
@login_required
def api_chat():
...
@app.route('/api/rate', methods=['POST'])
@login_required
def api_rate():
"""用户评价对话质量"""
try:
data = request.get_json()
conversation_id = data.get('conversation_id')
rating = data.get('rating') # 1-5分
liked = data.get('liked') # true/false
if not conversation_id:
return jsonify({'error': '缺少对话ID'}), 400
conn = get_db_connection()
cursor = conn.cursor()
update_fields = ['rated_at = CURRENT_TIMESTAMP']
params = []
if rating is not None:
update_fields.append('user_rating = %s')
params.append(rating)
if liked is not None:
update_fields.append('user_liked = %s')
params.append(1 if liked else 0)
params.append(conversation_id)
query = f"UPDATE user_conversations SET {', '.join(update_fields)} WHERE id = %s"
cursor.execute(query, params)
conn.commit()
cursor.close()
conn.close()
return jsonify({'success': True, 'message': '评价成功'})
except Exception as e:
print(f"评价保存失败: {str(e)}")
return jsonify({'error': '评价失败'}), 500
@app.route('/admin')
@login_required
def admin():
"""后台管理页面"""
# 检查是否是管理员
if session['username'] != 'admin':
flash('仅管理员可访问后台')
return redirect(url_for('chat'))
return render_template('admin.html')
@app.route('/api/upload', methods=['POST'])
@login_required
def upload_file():
"""文件上传API"""
...
@app.route('/api/files')
@login_required
def list_files():
"""获取已上传文件列表"""
...
@app.route('/api/download/<filename>')
@login_required
def download_file(filename):
"""文件下载API"""
...
@app.route('/api/delete/<filename>', methods=['DELETE'])
@login_required
def delete_file(filename):
...
if __name__ == '__main__':
import sys
port = 5000
if len(sys.argv) > 1 and sys.argv[1] == '--port':
if len(sys.argv) > 2:
port = int(sys.argv[2])
app.run(host='0.0.0.0', port=port)
工具类-utils.py
配置文件-config.py
数据源连接工具-datasource.py
意图识别-intent_recognition.py
向量构建-vector_build.py
检索增强生成-rag.py
五、项目展示





六、项目收益
降本增效
显著降低人力成本,释放研发产能;系统整体问答准确率稳定在 85% 以上,可高效承接业务方关于表结构、字段含义、指标口径的高频咨询需求。经测算,单名数据开发人员每月可节省答疑耗时约 10 小时(按单次答疑平均 30 分钟计算),人均年产能释放超 120 小时,助力研发团队聚焦于数据模型优化、复杂分析需求等高价值工作。
业务赋能
提升业务响应效率,缩短决策周期;xxx
生态扩展
构建可复用智能数据服务底座,支撑未来生态拓展;xxx
#大数据开发##数据人的面试交流地##聊聊我眼中的AI##开工第一帖#
查看6道真题和解析