(附代码)LLM获取实时数据!通用数据源获取方案!MCP让Java再次伟大!
1. 项目背景
1.1 LLM的局限性
随着 LLM(如 GPT、Claude、Gemini)日益普及,然而,LLM受到数据隔离的限制,无法主动获取数据,而这就是“数据孤岛”问题,这种方式就极大限制了LLM的应用。
- ChatGPT 截止知识常在过去半年甚至更久
- 对于天气、库存、个性化内容、企业私有数据……一问三不知
- API 调用能力虽然存在,但难以统一管理、限流、权限、聚合 于是我们提出了一个问题: 👉 如何让 LLM 像“人”一样,从外部世界不断“获取信息”,而不是一问一答、闭门造车?
1.2 MCP 架构:连接 LLM 与实时数据的桥梁
MCP(模型上下文协议,Model Context Protocol)是一种面向 LLM 的服务代理架构,它让 AI 模型能够以一致的方式连接到各种不同的应用程序和数据源。
其架构如下:
执行流程如下:
(图片均来源于——一文详解MCP技术——MCP是什么,为什么重要 ?)
2. 解决方案:通用数据源获取方案
2.1 方案描述
为兼容所有类型的数据源,设计了两大核心工具设计了两大核心模块:数据源导航器(Data Source Navigator)、数据聚合器(Data Aggregator)。
功能模块 | 描述信息 |
---|---|
数据源导航器 | 帮助 AI 识别系统中可用的数据源、唯一编号及其数据表结构 |
数据聚合器 | 根据数据源的唯一编号集合,获取对应的数据源数据集 |
在整个流程中,AI 根据具体的分析需求,首先通过 数据源导航器 获取所需的数据源唯一编号集合,然后利用 数据聚合器 获取对应的数据源数据,从而实现对多数据源的兼容与灵活调用。
2.2 项目代码
项目启动时,会先获取data-source.java
配置文件中的数据,导入数据源,当用户提问后,LLM会先通过数据源导航器获取所需的数据源信息,然后通过数据源聚合器获取具体的数据源。
2.2.1 通用配置文件
[
{ "id": 1,
"description": "商品库存数据",
"url": "http://localhost:8080/mcp/data/inventory",
"options": "GET",
"params": {}
}, { "id": 2,
"description": "入库商品数据",
"url": "http://localhost:8080/mcp/data/inbound",
"options": "GET",
"params": {
"dateRange": {
"type": "Integer",
"description": "日期范围选项:1=最近7天,2=最近1月,3=最近3月,4=最近6月,5=最近1年"
}
} }, { "id": 3,
"description": "出库商品数据",
"url": "http://localhost:8080/mcp/data/outbound",
"options": "GET",
"params": {
"dateRange": {
"type": "Integer",
"description": "日期范围选项:1=最近7天,2=最近1月,3=最近3月,4=最近6月,5=最近1年"
}
} }, { "id": 4,
"description": "移库商品数据",
"url": "http://localhost:8080/mcp/data/transfer",
"options": "GET",
"params": {
"dateRange": {
"type": "Integer",
"description": "日期范围选项:1=最近7天,2=最近1月,3=最近3月,4=最近6月,5=最近1年"
}
} }]
2.2.2 数据源工具类
(1)数据源导航器
@Tool(description = "列出所有可用数据源及其介绍")
public String getAvailableDataSources() {
return dataSources.values().stream()
.map(info -> {
StringBuilder sb = new StringBuilder();
sb.append("- ID: ").append(info.getId()).append("\n");
sb.append(" 描述: ").append(info.getDescription()).append("\n");
sb.append(" URL: ").append(info.getUrl()).append("\n");
if (info.getParams() != null && !info.getParams().isEmpty()) {
sb.append(" 参数:\n");
for (Map.Entry<String, DateRangeInfo> entry : info.getParams().entrySet()) {
sb.append(" - ").append(entry.getKey())
.append("(").append(entry.getValue().getType())
.append("):").append(entry.getValue().getDescription()).append("\n");
} } return sb.toString();
}) .collect(Collectors.joining("\n", "当前可用数据源:\n", ""));
}
(2)数据源聚合器
@Tool(description = "通过数据源唯一ID集合获取相应数据。\n" +
"参数说明:\n" +
"- sourceIDs:需要请求的数据源ID列表;\n" +
"- inputParams:每个数据源对应的参数集合,结构为 Map<数据源ID, 参数键值对>," +
"用于为不同数据源分别设置请求参数。")
public String getDataBySourceIDs(List<Integer> sourceIDs, HashMap<Integer, HashMap<String, Object>> inputParams) {
log.info("获取数据源:{}", sourceIDs);
// 存储任务结果
List<Future<String>> futures = new ArrayList<>();
for (Integer id : sourceIDs) {
Future<String> future = threadPoolExecutor.submit(() -> {
DataSourceInfo info = dataSources.get(id);
StringBuilder singleResult = new StringBuilder();
if (info == null) {
singleResult.append("数据源ID ").append(id).append(" 未找到\n");
return singleResult.toString();
}
try {
// 构建 URL UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(info.getUrl());
// 获取当前数据源的参数
HashMap<String, Object> paramsForThisSource = inputParams.getOrDefault(id, new HashMap<>());
// 添加匹配参数
if (info.getParams() != null) {
for (String key : info.getParams().keySet()) {
if (paramsForThisSource.containsKey(key)) {
builder.queryParam(key, paramsForThisSource.get(key));
} } }
String finalUrl = builder.toUriString();
log.info("请求数据源 [{}]: {}", id, finalUrl);
String response = restTemplate.getForObject(finalUrl, String.class);
singleResult.append("数据源 ID: ").append(id).append(" 请求成功\n");
singleResult.append("返回内容:\n").append(response).append("\n\n");
log.info("请求数据源 [{}] 成功,返回内容:{}", id, response);
} catch (Exception e) {
singleResult.append("数据源 ID: ").append(id)
.append(" 请求失败,错误信息: ").append(e.getMessage()).append("\n\n");
log.error("请求数据源 [{}] 异常:{}", id, e.getMessage(), e);
}
return singleResult.toString();
});
futures.add(future);
}
// 等待所有任务完成并合并结果
StringBuilder result = new StringBuilder();
for (Future<String> future : futures) {
try {
result.append(future.get());
} catch (Exception e) {
log.error("获取任务结果失败:{}", e.getMessage(), e);
result.append("获取任务结果失败:").append(e.getMessage()).append("\n\n");
} }
log.info("获取数据源结果:\n{}", result.toString());
return result.toString();
}
(4)总体代码
package flow.datamcpservice.service;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import flow.datamcpservice.entity.DataSourceInfo;
import flow.datamcpservice.entity.DateRangeInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import pool.AdaptiveBufferedThreadPoolExecutor;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@Service
@Slf4j
public class DataService {
private final Map<Integer, DataSourceInfo> dataSources = new LinkedHashMap<>();
@Autowired
private RestTemplate restTemplate;
@Autowired
private AdaptiveBufferedThreadPoolExecutor threadPoolExecutor;
public DataService() {
loadDataSourcesFromJson();
}
private void loadDataSourcesFromJson() {
try {
ObjectMapper mapper = new ObjectMapper();
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("config/data-sources.json");
if (inputStream == null) {
log.error("未找到 data-sources.json,跳过数据源初始化");
return;
} List<DataSourceInfo> list = mapper.readValue(inputStream, new TypeReference<>() {});
for (DataSourceInfo info : list) {
dataSources.put(info.getId(), info);
} } catch (Exception e) {
log.error("加载数据源配置失败:{}", e.getMessage(), e);
} }
@Tool(description = "列出所有可用数据源及其介绍")
public String getAvailableDataSources() {
return dataSources.values().stream()
.map(info -> {
StringBuilder sb = new StringBuilder();
sb.append("- ID: ").append(info.getId()).append("\n");
sb.append(" 描述: ").append(info.getDescription()).append("\n");
sb.append(" URL: ").append(info.getUrl()).append("\n");
if (info.getParams() != null && !info.getParams().isEmpty()) {
sb.append(" 参数:\n");
for (Map.Entry<String, DateRangeInfo> entry : info.getParams().entrySet()) {
sb.append(" - ").append(entry.getKey())
.append("(").append(entry.getValue().getType())
.append("):").append(entry.getValue().getDescription()).append("\n");
} } return sb.toString();
}) .collect(Collectors.joining("\n", "当前可用数据源:\n", ""));
}
@Tool(description = "通过数据源唯一ID集合获取相应数据。\n" +
"参数说明:\n" +
"- sourceIDs:需要请求的数据源ID列表;\n" +
"- inputParams:每个数据源对应的参数集合,结构为 Map<数据源ID, 参数键值对>," +
"用于为不同数据源分别设置请求参数。")
public String getDataBySourceIDs(List<Integer> sourceIDs, HashMap<Integer, HashMap<String, Object>> inputParams) {
log.info("获取数据源:{}", sourceIDs);
// 存储任务结果
List<Future<String>> futures = new ArrayList<>();
for (Integer id : sourceIDs) {
Future<String> future = threadPoolExecutor.submit(() -> {
DataSourceInfo info = dataSources.get(id);
StringBuilder singleResult = new StringBuilder();
if (info == null) {
singleResult.append("数据源ID ").append(id).append(" 未找到\n");
return singleResult.toString();
}
try {
// 构建 URL UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(info.getUrl());
// 获取当前数据源的参数
HashMap<String, Object> paramsForThisSource = inputParams.getOrDefault(id, new HashMap<>());
// 添加匹配参数
if (info.getParams() != null) {
for (String key : info.getParams().keySet()) {
if (paramsForThisSource.containsKey(key)) {
builder.queryParam(key, paramsForThisSource.get(key));
} } }
String finalUrl = builder.toUriString();
log.info("请求数据源 [{}]: {}", id, finalUrl);
String response = restTemplate.getForObject(finalUrl, String.class);
singleResult.append("数据源 ID: ").append(id).append(" 请求成功\n");
singleResult.append("返回内容:\n").append(response).append("\n\n");
log.info("请求数据源 [{}] 成功,返回内容:{}", id, response);
} catch (Exception e) {
singleResult.append("数据源 ID: ").append(id)
.append(" 请求失败,错误信息: ").append(e.getMessage()).append("\n\n");
log.error("请求数据源 [{}] 异常:{}", id, e.getMessage(), e);
}
return singleResult.toString();
});
futures.add(future);
}
// 等待所有任务完成并合并结果
StringBuilder result = new StringBuilder();
for (Future<String> future : futures) {
try {
result.append(future.get());
} catch (Exception e) {
log.error("获取任务结果失败:{}", e.getMessage(), e);
result.append("获取任务结果失败:").append(e.getMessage()).append("\n\n");
} }
log.info("获取数据源结果:\n{}", result.toString());
return result.toString();
}
}
#聊聊我眼中的AI##后端##互联网##后端校招##java#