企业级项目二:基于Doris+LangChain构建数据智能运营AI助手

前言

近一年,大数据面试的风向正在发生明显变化。

过去,面试官更关注:

  • 数仓分层怎么设计?
  • 指标体系如何构建?
  • Spark 调优怎么做的?

而现在,越来越多的面试开始追问:

  • 你有没有做过 AI 相关项目?
  • LLM 在数据场景中如何落地?
  • RAG 是怎么设计的?

AI 不再是「加分项」,而是逐渐成为「基础能力」。

但问题是 很多人所谓的 AI 项目,只停留在:

  • 调个 API
  • 写个简单问答 Demo

这在企业级场景里,是远远不够的。真正有含金量的 AI 项目,必须要解决真实业务问题,并具备:

  • 检索增强(RAG)落地能力
  • LLM 语义理解与 Prompt 设计能力
  • 工程化架构能力(可扩展、可复用、可维护)

而在我们当前的数据仓库体系中:

  • 表数量已达数千级
  • 字段与指标数以万计
  • 数据答疑占据约 20% 的人力成本

于是,我们决定:用 LLM 重构「数据找数与问数」的方式。

本项目基于 Doris + LangChain + Prompt 工程 + Python + Flask + MySQL,构建了一套企业级数据智能运营助手,系统性解决找数难、问数繁、答数重的问题。

接下来,我会完整拆解它的架构设计、RAG 实现方式、Prompt 优化策略,以及如何把这个项目讲到面试官点头为止。

一、项目背景

本项目涉及到的核心技术栈:Doris + LangChain + Prompt工程 + Python + Flask + MySQL

在当前数据仓库体系中,表数量已达数千级别,字段与指标更是数以万计。业务方在日常工作中难以高效定位所需的数据资产,往往需要依赖数据开发人员协助查找表结构、字段含义或指标口径。据统计,数据团队同学约20%的工作时间用于重复性答疑,不仅严重影响开发效率,也制约了数据服务的响应速度与用户体验。

为系统性解决找数难、问数繁、答数重的问题,亟需构建一个智能化、自助化的数据运营支持工具。本项目旨在通过引入LLM,结合元数据管理与语义理解,打造一个能够精准理解用户意图、自动检索并解释数据资产的智能问答助手,从而显著提升数据发现效率,降低数据开发同学的答疑成本,推动数据服务向自助化、智能化演进。

二、预备知识

环境准备

Doris环境安装

# 安装docker
brew install docker

# 安装Doris
bash start-doris.sh -v 4.0.3

# docker常用命令
Stop cluster: docker-compose -f docker-compose-doris.yaml down
Start cluster: docker-compose -f docker-compose-doris.yaml up -d
View logs: docker-compose -f docker-compose-doris.yaml logs -f

# 启动Doris客户端
mysql -uroot -P9030 -h127.0.0.1

项目依赖包安装

提醒:Python版本>=3.11

pip install langchain_community
pip install langchain_chroma
pip install langchain_core
pip install dashscope
pip install flask
pip install mysql-connector-python

大模型API-Key获取

技术储备

RAG理论知识

RAG(Retrieval-Augmented Generation)= 检索(Retrieve) + 增强(Augment) + 生成(Generate)

核心思想:先从知识库里找相关资料,再把资料交给大模型生成答案,而不是让模型“凭记忆胡说”。

LangChain基础使用

概述

  • LangChain 是一个用于构建 LLM 应用 的框架,核心解决:Prompt 管理、模型调用封装、文档加载、向量检索、RAG 构建、Agent 调度
  • 一句话总结:LangChain 是一个把“大模型调用”工程化的工具框架

核心组件

模块

作用

LLM / ChatModel

调用大模型

PromptTemplate

管理提示词

OutputParser

解析输出

DocumentLoader

加载文档

TextSplitter

文档切分

Embeddings

向量化

VectorStore

向量存储

Retriever

检索器

Chain

串联流程

入门案例

import os
from langchain_community.chat_models.tongyi import ChatTongyi
model = ChatTongyi(
    model_name="qwen3-max",
    dashscope_api_key=os.getenv("DASHSCOPE_API_KEY") or os.getenv("OPENAI_API_KEY")
)
response = model.invoke("你是谁?")
print(response.content)

Doris ANN索引使用

概述

Apache Doris 中的近似最近邻 (ANN) 索引支持对高维数据进行高效的向量相似性搜索。从 Doris 4.x 开始,通用索引操作语法也支持了 ANN 索引

如何创建

如何使用

三、项目流程

离线流程:xxx

在线流程:xxx

四、项目源码

以下所有代码均本人亲自编写,有任何问题可直接私聊我

SQL代码

创建Doris向量表


CREATE DATABASE doris_document_db;

USE doris_document_db;

CREATE TABLE `doris_vector_table` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `document_name` text NULL,
  `content` text NULL,
  `embedding` array<float> NOT NULL,
  INDEX idx_embedding (`embedding`) USING ANN PROPERTIES(
    "dim" = "512",
    "index_type" = "hnsw",
    "metric_type" = "l2_distance"
  )
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

创建用户表及用户对话历史表

创建元数据表

创建文件记录表

前端代码

管理员-admin.html

聊天对话-chat.html

登录-login.html

注册-register.html

后端代码

注意:其中OPENAI_API_KEY、DASHSCOPE_API_KEY配置在环境变量中

项目入口-app.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from flask import Flask, render_template, request, redirect, url_for, session, flash, Response, jsonify
from werkzeug.security import generate_password_hash, check_password_hash
import mysql.connector
from intent_recognition import execute as intent_execute
import json
from functools import wraps

app = Flask(__name__)
app.secret_key = 'your_secret_key'  # 替换为实际的密钥

from config import DB_CONFIG_BUSINESS

def get_db_connection():
    """获取数据库连接"""
    conn = mysql.connector.connect(**DB_CONFIG_BUSINESS)
    return conn

# 登录装饰器
def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            flash('请先登录')
            return redirect(url_for('login'))
        return f(*args, **kwargs)
    return decorated_function

@app.route('/')
def index():
    if 'user_id' in session:
        return redirect(url_for('chat'))
    return redirect(url_for('login'))

@app.route('/register', methods=['GET', 'POST'])
def register():
    ...

@app.route('/login', methods=['GET', 'POST'])
def login():
    ...

@app.route('/logout')
def logout():
    session.clear()
    flash('您已登出')
    return redirect(url_for('login'))

@app.route('/chat')
@login_required
def chat():
    # 获取用户对话历史数量,用于判断是否是首次访问
    is_first_visit = True
    try:
        conn = get_db_connection()
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT COUNT(*) as count FROM user_conversations WHERE user_id = %s", (session['user_id'],))
        result = cursor.fetchone()
        is_first_visit = result['count'] == 0 if result else True
    except Exception as e:
        print(f"获取用户对话历史失败: {str(e)}")
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()

    # 获取用户名首字母
    user_initials = session['username'][0].upper() if session['username'] else 'U'

    return render_template('chat.html',
                         username=session['username'],
                         is_first_visit=is_first_visit,
                         user_initials=user_initials)

@app.route('/api/chat', methods=['POST', 'GET'])
@login_required
def api_chat():
    ...

@app.route('/api/rate', methods=['POST'])
@login_required
def api_rate():
    """用户评价对话质量"""
    try:
        data = request.get_json()
        conversation_id = data.get('conversation_id')
        rating = data.get('rating')  # 1-5分
        liked = data.get('liked')    # true/false

        if not conversation_id:
            return jsonify({'error': '缺少对话ID'}), 400

        conn = get_db_connection()
        cursor = conn.cursor()

        update_fields = ['rated_at = CURRENT_TIMESTAMP']
        params = []

        if rating is not None:
            update_fields.append('user_rating = %s')
            params.append(rating)

        if liked is not None:
            update_fields.append('user_liked = %s')
            params.append(1 if liked else 0)

        params.append(conversation_id)

        query = f"UPDATE user_conversations SET {', '.join(update_fields)} WHERE id = %s"
        cursor.execute(query, params)
        conn.commit()

        cursor.close()
        conn.close()

        return jsonify({'success': True, 'message': '评价成功'})

    except Exception as e:
        print(f"评价保存失败: {str(e)}")
        return jsonify({'error': '评价失败'}), 500

@app.route('/admin')
@login_required
def admin():
    """后台管理页面"""
    # 检查是否是管理员
    if session['username'] != 'admin':
        flash('仅管理员可访问后台')
        return redirect(url_for('chat'))

    return render_template('admin.html')

@app.route('/api/upload', methods=['POST'])
@login_required
def upload_file():
    """文件上传API"""
    ...

@app.route('/api/files')
@login_required
def list_files():
    """获取已上传文件列表"""
    ...

@app.route('/api/download/<filename>')
@login_required
def download_file(filename):
    """文件下载API"""
    ...

@app.route('/api/delete/<filename>', methods=['DELETE'])
@login_required
def delete_file(filename):
    ...

if __name__ == '__main__':
    import sys
    port = 5000
    if len(sys.argv) > 1 and sys.argv[1] == '--port':
        if len(sys.argv) > 2:
            port = int(sys.argv[2])
    app.run(host='0.0.0.0', port=port)

工具类-utils.py

配置文件-config.py

数据源连接工具-datasource.py

意图识别-intent_recognition.py

向量构建-vector_build.py

检索增强生成-rag.py

五、项目展示

六、项目收益

降本增效

显著降低人力成本,释放研发产能;系统整体问答准确率稳定在 85% 以上,可高效承接业务方关于表结构、字段含义、指标口径的高频咨询需求。经测算,单名数据开发人员每月可节省答疑耗时约 10 小时(按单次答疑平均 30 分钟计算),人均年产能释放超 120 小时,助力研发团队聚焦于数据模型优化、复杂分析需求等高价值工作。

业务赋能

提升业务响应效率,缩短决策周期;xxx

生态扩展

构建可复用智能数据服务底座,支撑未来生态拓展;xxx

#大数据开发##数据人的面试交流地##聊聊我眼中的AI##开工第一帖#
全部评论
我想知道这个项目和数仓工程师有什么联系,看起来和数仓的关系不大,感觉就是两个职位
点赞 回复 分享
发布于 02-28 16:32 河南
感谢分享,吸欧气!
点赞 回复 分享
发布于 02-27 19:15 四川

相关推荐

面试部门:快手企业事务部门面试时间:3月27日面试岗位:java开发自我介绍项目(面试官对于常规的Java并发项目没怎么问)主要和我探讨了下AI和Agent1.先讲项目非Agent部分模块组成,问的很多,侧重于讲述不同模块做什么2.再讲项目Agent模块组成,同样很健谈,面试官会真的细问,确定项目是你自己做的3.项目的数据库选型,如何用的4.项目为什么用Redis,怎么用的5.项目数据库制表的思路,你自己建表吗?6.Redis的功能和职责,你觉得Redis的瓶颈在哪里,或者说Redis业务边缘7.RAG怎么做的,包括选型和流程8.项目Agent对于幻觉的优化措施,如何处理幻觉/LLM乱说话9.项目里Skill是做什么的,和企业级Skill是一个东西吗,谈谈对Skill的理解10.是否想过把项目包装为知识库供外部使用(拓展聊了很多,主要是面试官给我讲了些知识)受益匪浅,不得不说是一个非常专业的面试官基础问题1.LangChain4j,为甚么选这个2.SpringCloud了解吗,组成部分(我说个人项目没用过,跳过了)RabbitMQ了解吗,谈谈MQ是什么,不同的几个MQ3.RabbitMQ了解吗,谈谈MQ是什么,不同的几个MQ,MQ组成部分4.消费者具体是干什么的?谈谈业务里做了什么无Java八股,无手撕,很愉快的面试面试官非常健谈,聊了很多行业内部对于Agent开发的建议和业务给我了很多思考,包括如何让提示词变得很有用,如何确保一个LLM,不同人能有近似的使用效果知识库的概念等等,受益匪浅反问简单问了下业务和部门结束了面试官非常肯定我项目的全栈开发方向,也建议我继续走这条路线这里也劝前端的UU们,也要多了解后端的,可能目前业内对于纯前端不是很刚需了。。整体来说很不错,出结果了更新!欢迎朋友们投快手!怎么说都是很不错的体验!
求好运眷顾🙏🏻:感觉传统八股越来越少了,Agent拷打变多了
查看15道真题和解析
点赞 评论 收藏
分享
本周面试了快手社招技术终面,是个40多岁的女人,居然还是2个部门的最高领导,感觉是技术里面的高层,问的技术难题不多,主要是场景题,大概面了40分钟1&nbsp;面试官先自我介绍一下部门干的活,以及后续对于新人来了之后主要是要干啥,以及她对新人的一些要求2&nbsp;自我进行介绍,说一下你这边做的一些事情以及技术亮点3&nbsp;你说一下对于技术开发需要遵循哪些规范,你感觉作为技术人对于数据如何可以体现你的价值,以及你感觉技术如何给业务进行业务提升4&nbsp;对于一个湖仓架构,技术选型你优先会选择哪个,为啥会选择这个技术,这个对于别的优势体现在哪5&nbsp;如果要你来做一个实时数仓,你感觉对于实时数据精确一致性需要如何设计,对于技术如何体现难度6&nbsp;你做过财务数据和流量数据,你感觉这2方面的数据是有啥区别,对于技术来说如何选型,对于数据的指标哪些区别7&nbsp;你认为对于行为数据埋点可以怎么设计,对于埋点会出现啥问题,你平时对于埋点实时接入会有啥问题,对于数据来说如果埋点设计的不好有啥风险8&nbsp;你这边有啥需要找我理解的我感觉这个领导似乎对于技术不怎么面了,感觉是在问一些对于数据开发的思考和总体技术和风险评估,感觉像是在招一个职级比较高的人,面了大概40分钟,不知道能不能过,但我感觉有戏,过了再来更新
查看7道真题和解析
点赞 评论 收藏
分享
评论
4
18
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务