企业级项目二:基于Doris+LangChain构建数据智能运营AI助手

前言

近一年,大数据面试的风向正在发生明显变化。

过去,面试官更关注:

  • 数仓分层怎么设计?
  • 指标体系如何构建?
  • Spark 调优怎么做的?

而现在,越来越多的面试开始追问:

  • 你有没有做过 AI 相关项目?
  • LLM 在数据场景中如何落地?
  • RAG 是怎么设计的?

AI 不再是「加分项」,而是逐渐成为「基础能力」。

但问题是 很多人所谓的 AI 项目,只停留在:

  • 调个 API
  • 写个简单问答 Demo

这在企业级场景里,是远远不够的。真正有含金量的 AI 项目,必须要解决真实业务问题,并具备:

  • 检索增强(RAG)落地能力
  • LLM 语义理解与 Prompt 设计能力
  • 工程化架构能力(可扩展、可复用、可维护)

而在我们当前的数据仓库体系中:

  • 表数量已达数千级
  • 字段与指标数以万计
  • 数据答疑占据约 20% 的人力成本

于是,我们决定:用 LLM 重构「数据找数与问数」的方式。

本项目基于 Doris + LangChain + Prompt 工程 + Python + Flask + MySQL,构建了一套企业级数据智能运营助手,系统性解决找数难、问数繁、答数重的问题。

接下来,我会完整拆解它的架构设计、RAG 实现方式、Prompt 优化策略,以及如何把这个项目讲到面试官点头为止。

一、项目背景

本项目涉及到的核心技术栈:Doris + LangChain + Prompt工程 + Python + Flask + MySQL

在当前数据仓库体系中,表数量已达数千级别,字段与指标更是数以万计。业务方在日常工作中难以高效定位所需的数据资产,往往需要依赖数据开发人员协助查找表结构、字段含义或指标口径。据统计,数据团队同学约20%的工作时间用于重复性答疑,不仅严重影响开发效率,也制约了数据服务的响应速度与用户体验。

为系统性解决找数难、问数繁、答数重的问题,亟需构建一个智能化、自助化的数据运营支持工具。本项目旨在通过引入LLM,结合元数据管理与语义理解,打造一个能够精准理解用户意图、自动检索并解释数据资产的智能问答助手,从而显著提升数据发现效率,降低数据开发同学的答疑成本,推动数据服务向自助化、智能化演进。

二、预备知识

环境准备

Doris环境安装

# 安装docker
brew install docker

# 安装Doris
bash start-doris.sh -v 4.0.3

# docker常用命令
Stop cluster: docker-compose -f docker-compose-doris.yaml down
Start cluster: docker-compose -f docker-compose-doris.yaml up -d
View logs: docker-compose -f docker-compose-doris.yaml logs -f

# 启动Doris客户端
mysql -uroot -P9030 -h127.0.0.1

项目依赖包安装

提醒:Python版本>=3.11

pip install langchain_community
pip install langchain_chroma
pip install langchain_core
pip install dashscope
pip install flask
pip install mysql-connector-python

大模型API-Key获取

技术储备

RAG理论知识

RAG(Retrieval-Augmented Generation)= 检索(Retrieve) + 增强(Augment) + 生成(Generate)

核心思想:先从知识库里找相关资料,再把资料交给大模型生成答案,而不是让模型“凭记忆胡说”。

LangChain基础使用

概述

  • LangChain 是一个用于构建 LLM 应用 的框架,核心解决:Prompt 管理、模型调用封装、文档加载、向量检索、RAG 构建、Agent 调度
  • 一句话总结:LangChain 是一个把“大模型调用”工程化的工具框架

核心组件

模块

作用

LLM / ChatModel

调用大模型

PromptTemplate

管理提示词

OutputParser

解析输出

DocumentLoader

加载文档

TextSplitter

文档切分

Embeddings

向量化

VectorStore

向量存储

Retriever

检索器

Chain

串联流程

入门案例

import os
from langchain_community.chat_models.tongyi import ChatTongyi
model = ChatTongyi(
    model_name="qwen3-max",
    dashscope_api_key=os.getenv("DASHSCOPE_API_KEY") or os.getenv("OPENAI_API_KEY")
)
response = model.invoke("你是谁?")
print(response.content)

Doris ANN索引使用

概述

Apache Doris 中的近似最近邻 (ANN) 索引支持对高维数据进行高效的向量相似性搜索。从 Doris 4.x 开始,通用索引操作语法也支持了 ANN 索引

如何创建

如何使用

三、项目流程

离线流程:xxx

在线流程:xxx

四、项目源码

以下所有代码均本人亲自编写,有任何问题可直接私聊我

SQL代码

创建Doris向量表


CREATE DATABASE doris_document_db;

USE doris_document_db;

CREATE TABLE `doris_vector_table` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `document_name` text NULL,
  `content` text NULL,
  `embedding` array<float> NOT NULL,
  INDEX idx_embedding (`embedding`) USING ANN PROPERTIES(
    "dim" = "512",
    "index_type" = "hnsw",
    "metric_type" = "l2_distance"
  )
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

创建用户表及用户对话历史表

创建元数据表

创建文件记录表

前端代码

管理员-admin.html

聊天对话-chat.html

登录-login.html

注册-register.html

后端代码

注意:其中OPENAI_API_KEY、DASHSCOPE_API_KEY配置在环境变量中

项目入口-app.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from flask import Flask, render_template, request, redirect, url_for, session, flash, Response, jsonify
from werkzeug.security import generate_password_hash, check_password_hash
import mysql.connector
from intent_recognition import execute as intent_execute
import json
from functools import wraps

app = Flask(__name__)
app.secret_key = 'your_secret_key'  # 替换为实际的密钥

from config import DB_CONFIG_BUSINESS

def get_db_connection():
    """获取数据库连接"""
    conn = mysql.connector.connect(**DB_CONFIG_BUSINESS)
    return conn

# 登录装饰器
def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            flash('请先登录')
            return redirect(url_for('login'))
        return f(*args, **kwargs)
    return decorated_function

@app.route('/')
def index():
    if 'user_id' in session:
        return redirect(url_for('chat'))
    return redirect(url_for('login'))

@app.route('/register', methods=['GET', 'POST'])
def register():
    ...

@app.route('/login', methods=['GET', 'POST'])
def login():
    ...

@app.route('/logout')
def logout():
    session.clear()
    flash('您已登出')
    return redirect(url_for('login'))

@app.route('/chat')
@login_required
def chat():
    # 获取用户对话历史数量,用于判断是否是首次访问
    is_first_visit = True
    try:
        conn = get_db_connection()
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT COUNT(*) as count FROM user_conversations WHERE user_id = %s", (session['user_id'],))
        result = cursor.fetchone()
        is_first_visit = result['count'] == 0 if result else True
    except Exception as e:
        print(f"获取用户对话历史失败: {str(e)}")
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()

    # 获取用户名首字母
    user_initials = session['username'][0].upper() if session['username'] else 'U'

    return render_template('chat.html',
                         username=session['username'],
                         is_first_visit=is_first_visit,
                         user_initials=user_initials)

@app.route('/api/chat', methods=['POST', 'GET'])
@login_required
def api_chat():
    ...

@app.route('/api/rate', methods=['POST'])
@login_required
def api_rate():
    """用户评价对话质量"""
    try:
        data = request.get_json()
        conversation_id = data.get('conversation_id')
        rating = data.get('rating')  # 1-5分
        liked = data.get('liked')    # true/false

        if not conversation_id:
            return jsonify({'error': '缺少对话ID'}), 400

        conn = get_db_connection()
        cursor = conn.cursor()

        update_fields = ['rated_at = CURRENT_TIMESTAMP']
        params = []

        if rating is not None:
            update_fields.append('user_rating = %s')
            params.append(rating)

        if liked is not None:
            update_fields.append('user_liked = %s')
            params.append(1 if liked else 0)

        params.append(conversation_id)

        query = f"UPDATE user_conversations SET {', '.join(update_fields)} WHERE id = %s"
        cursor.execute(query, params)
        conn.commit()

        cursor.close()
        conn.close()

        return jsonify({'success': True, 'message': '评价成功'})

    except Exception as e:
        print(f"评价保存失败: {str(e)}")
        return jsonify({'error': '评价失败'}), 500

@app.route('/admin')
@login_required
def admin():
    """后台管理页面"""
    # 检查是否是管理员
    if session['username'] != 'admin':
        flash('仅管理员可访问后台')
        return redirect(url_for('chat'))

    return render_template('admin.html')

@app.route('/api/upload', methods=['POST'])
@login_required
def upload_file():
    """文件上传API"""
    ...

@app.route('/api/files')
@login_required
def list_files():
    """获取已上传文件列表"""
    ...

@app.route('/api/download/<filename>')
@login_required
def download_file(filename):
    """文件下载API"""
    ...

@app.route('/api/delete/<filename>', methods=['DELETE'])
@login_required
def delete_file(filename):
    ...

if __name__ == '__main__':
    import sys
    port = 5000
    if len(sys.argv) > 1 and sys.argv[1] == '--port':
        if len(sys.argv) > 2:
            port = int(sys.argv[2])
    app.run(host='0.0.0.0', port=port)

工具类-utils.py

配置文件-config.py

数据源连接工具-datasource.py

意图识别-intent_recognition.py

向量构建-vector_build.py

检索增强生成-rag.py

五、项目展示

六、项目收益

降本增效

显著降低人力成本,释放研发产能;系统整体问答准确率稳定在 85% 以上,可高效承接业务方关于表结构、字段含义、指标口径的高频咨询需求。经测算,单名数据开发人员每月可节省答疑耗时约 10 小时(按单次答疑平均 30 分钟计算),人均年产能释放超 120 小时,助力研发团队聚焦于数据模型优化、复杂分析需求等高价值工作。

业务赋能

提升业务响应效率,缩短决策周期;xxx

生态扩展

构建可复用智能数据服务底座,支撑未来生态拓展;xxx

#大数据开发##数据人的面试交流地##聊聊我眼中的AI##开工第一帖#
全部评论
感谢分享,吸欧气!
点赞 回复 分享
发布于 昨天 19:15 四川

相关推荐

02-26 18:55
Java
查看6道真题和解析
点赞 评论 收藏
分享
评论
1
3
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务