(附代码)LLM获取实时数据!通用数据源获取方案!MCP让Java再次伟大!

1. 项目背景

1.1 LLM的局限性

随着 LLM(如 GPT、Claude、Gemini)日益普及,然而,LLM受到数据隔离的限制,无法主动获取数据,而这就是“数据孤岛”问题,这种方式就极大限制了LLM的应用。

  • ChatGPT 截止知识常在过去半年甚至更久
  • 对于天气、库存、个性化内容、企业私有数据……一问三不知
  • API 调用能力虽然存在,但难以统一管理、限流、权限、聚合 于是我们提出了一个问题: 👉 如何让 LLM 像“人”一样,从外部世界不断“获取信息”,而不是一问一答、闭门造车?

1.2 MCP 架构:连接 LLM 与实时数据的桥梁

MCP(模型上下文协议,Model Context Protocol)是一种面向 LLM 的服务代理架构,它让 AI 模型能够以一致的方式连接到各种不同的应用程序和数据源。 其架构如下: alt

执行流程如下: alt (图片均来源于——一文详解MCP技术——MCP是什么,为什么重要 ?

2. 解决方案:通用数据源获取方案

2.1 方案描述

为兼容所有类型的数据源,设计了两大核心工具设计了两大核心模块:数据源导航器(Data Source Navigator)、数据聚合器(Data Aggregator)

功能模块 描述信息
数据源导航器 帮助 AI 识别系统中可用的数据源、唯一编号及其数据表结构
数据聚合器 根据数据源的唯一编号集合,获取对应的数据源数据集

在整个流程中,AI 根据具体的分析需求,首先通过 数据源导航器 获取所需的数据源唯一编号集合,然后利用 数据聚合器 获取对应的数据源数据,从而实现对多数据源的兼容与灵活调用。

2.2 项目代码

项目启动时,会先获取data-source.java配置文件中的数据,导入数据源,当用户提问后,LLM会先通过数据源导航器获取所需的数据源信息,然后通过数据源聚合器获取具体的数据源。

2.2.1 通用配置文件

[  
  {    "id": 1,  
    "description": "商品库存数据",  
    "url": "http://localhost:8080/mcp/data/inventory",  
    "options": "GET",  
    "params": {}  
  },  {    "id": 2,  
    "description": "入库商品数据",  
    "url": "http://localhost:8080/mcp/data/inbound",  
    "options": "GET",  
    "params": {  
      "dateRange": {  
        "type": "Integer",  
        "description": "日期范围选项:1=最近7天,2=最近1月,3=最近3月,4=最近6月,5=最近1年"  
      }  
    }  },  {    "id": 3,  
    "description": "出库商品数据",  
    "url": "http://localhost:8080/mcp/data/outbound",  
    "options": "GET",  
    "params": {  
      "dateRange": {  
        "type": "Integer",  
        "description": "日期范围选项:1=最近7天,2=最近1月,3=最近3月,4=最近6月,5=最近1年"  
      }  
    }  },  {    "id": 4,  
    "description": "移库商品数据",  
    "url": "http://localhost:8080/mcp/data/transfer",  
    "options": "GET",  
    "params": {  
      "dateRange": {  
        "type": "Integer",  
        "description": "日期范围选项:1=最近7天,2=最近1月,3=最近3月,4=最近6月,5=最近1年"  
      }  
    }  }]

2.2.2 数据源工具类

(1)数据源导航器

@Tool(description = "列出所有可用数据源及其介绍")  
public String getAvailableDataSources() {  
    return dataSources.values().stream()  
            .map(info -> {  
                StringBuilder sb = new StringBuilder();  
                sb.append("- ID: ").append(info.getId()).append("\n");  
                sb.append("  描述: ").append(info.getDescription()).append("\n");  
                sb.append("  URL: ").append(info.getUrl()).append("\n");  
                if (info.getParams() != null && !info.getParams().isEmpty()) {  
                    sb.append("  参数:\n");  
                    for (Map.Entry<String, DateRangeInfo> entry : info.getParams().entrySet()) {  
                        sb.append("    - ").append(entry.getKey())  
                                .append("(").append(entry.getValue().getType())  
                                .append("):").append(entry.getValue().getDescription()).append("\n");  
                    }                }                return sb.toString();  
            })            .collect(Collectors.joining("\n", "当前可用数据源:\n", ""));  
}

(2)数据源聚合器

@Tool(description = "通过数据源唯一ID集合获取相应数据。\n" +  
        "参数说明:\n" +  
        "- sourceIDs:需要请求的数据源ID列表;\n" +  
        "- inputParams:每个数据源对应的参数集合,结构为 Map<数据源ID, 参数键值对>," +  
        "用于为不同数据源分别设置请求参数。")  
public String getDataBySourceIDs(List<Integer> sourceIDs, HashMap<Integer, HashMap<String, Object>> inputParams) {  
    log.info("获取数据源:{}", sourceIDs);  
  
    // 存储任务结果  
    List<Future<String>> futures = new ArrayList<>();  
  
    for (Integer id : sourceIDs) {  
        Future<String> future = threadPoolExecutor.submit(() -> {  
            DataSourceInfo info = dataSources.get(id);  
            StringBuilder singleResult = new StringBuilder();  
  
            if (info == null) {  
                singleResult.append("数据源ID ").append(id).append(" 未找到\n");  
                return singleResult.toString();  
            }  
            try {  
                // 构建 URL                UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(info.getUrl());  
  
                // 获取当前数据源的参数  
                HashMap<String, Object> paramsForThisSource = inputParams.getOrDefault(id, new HashMap<>());  
  
                // 添加匹配参数  
                if (info.getParams() != null) {  
                    for (String key : info.getParams().keySet()) {  
                        if (paramsForThisSource.containsKey(key)) {  
                            builder.queryParam(key, paramsForThisSource.get(key));  
                        }                    }                }  
                String finalUrl = builder.toUriString();  
                log.info("请求数据源 [{}]: {}", id, finalUrl);  
  
                String response = restTemplate.getForObject(finalUrl, String.class);  
                singleResult.append("数据源 ID: ").append(id).append(" 请求成功\n");  
                singleResult.append("返回内容:\n").append(response).append("\n\n");  
                log.info("请求数据源 [{}] 成功,返回内容:{}", id, response);  
            } catch (Exception e) {  
                singleResult.append("数据源 ID: ").append(id)  
                        .append(" 请求失败,错误信息: ").append(e.getMessage()).append("\n\n");  
                log.error("请求数据源 [{}] 异常:{}", id, e.getMessage(), e);  
            }  
            return singleResult.toString();  
        });  
        futures.add(future);  
    }  
    // 等待所有任务完成并合并结果  
    StringBuilder result = new StringBuilder();  
    for (Future<String> future : futures) {  
        try {  
            result.append(future.get());  
        } catch (Exception e) {  
            log.error("获取任务结果失败:{}", e.getMessage(), e);  
            result.append("获取任务结果失败:").append(e.getMessage()).append("\n\n");  
        }    }  
    log.info("获取数据源结果:\n{}", result.toString());  
    return result.toString();  
}

(4)总体代码

package flow.datamcpservice.service;  
  
import com.fasterxml.jackson.core.type.TypeReference;  
import com.fasterxml.jackson.databind.ObjectMapper;  
import flow.datamcpservice.entity.DataSourceInfo;  
import flow.datamcpservice.entity.DateRangeInfo;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.ai.tool.annotation.Tool;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Service;  
import org.springframework.web.client.RestTemplate;  
import org.springframework.web.util.UriComponentsBuilder;  
import pool.AdaptiveBufferedThreadPoolExecutor;  
  
import java.io.InputStream;  
import java.util.*;  
import java.util.concurrent.Future;  
import java.util.stream.Collectors;  
  
@Service  
@Slf4j  
public class DataService {  
  
    private final Map<Integer, DataSourceInfo> dataSources = new LinkedHashMap<>();  
  
    @Autowired  
    private RestTemplate restTemplate;  
  
    @Autowired  
    private AdaptiveBufferedThreadPoolExecutor threadPoolExecutor;  
  
    public DataService() {  
        loadDataSourcesFromJson();  
    }  
    private void loadDataSourcesFromJson() {  
        try {  
            ObjectMapper mapper = new ObjectMapper();  
            InputStream inputStream = getClass().getClassLoader().getResourceAsStream("config/data-sources.json");  
            if (inputStream == null) {  
                log.error("未找到 data-sources.json,跳过数据源初始化");  
                return;  
            }            List<DataSourceInfo> list = mapper.readValue(inputStream, new TypeReference<>() {});  
            for (DataSourceInfo info : list) {  
                dataSources.put(info.getId(), info);  
            }        } catch (Exception e) {  
            log.error("加载数据源配置失败:{}", e.getMessage(), e);  
        }    }  
    @Tool(description = "列出所有可用数据源及其介绍")  
    public String getAvailableDataSources() {  
        return dataSources.values().stream()  
                .map(info -> {  
                    StringBuilder sb = new StringBuilder();  
                    sb.append("- ID: ").append(info.getId()).append("\n");  
                    sb.append("  描述: ").append(info.getDescription()).append("\n");  
                    sb.append("  URL: ").append(info.getUrl()).append("\n");  
                    if (info.getParams() != null && !info.getParams().isEmpty()) {  
                        sb.append("  参数:\n");  
                        for (Map.Entry<String, DateRangeInfo> entry : info.getParams().entrySet()) {  
                            sb.append("    - ").append(entry.getKey())  
                                    .append("(").append(entry.getValue().getType())  
                                    .append("):").append(entry.getValue().getDescription()).append("\n");  
                        }                    }                    return sb.toString();  
                })                .collect(Collectors.joining("\n", "当前可用数据源:\n", ""));  
    }  
    @Tool(description = "通过数据源唯一ID集合获取相应数据。\n" +  
            "参数说明:\n" +  
            "- sourceIDs:需要请求的数据源ID列表;\n" +  
            "- inputParams:每个数据源对应的参数集合,结构为 Map<数据源ID, 参数键值对>," +  
            "用于为不同数据源分别设置请求参数。")  
    public String getDataBySourceIDs(List<Integer> sourceIDs, HashMap<Integer, HashMap<String, Object>> inputParams) {  
        log.info("获取数据源:{}", sourceIDs);  
  
        // 存储任务结果  
        List<Future<String>> futures = new ArrayList<>();  
  
        for (Integer id : sourceIDs) {  
            Future<String> future = threadPoolExecutor.submit(() -> {  
                DataSourceInfo info = dataSources.get(id);  
                StringBuilder singleResult = new StringBuilder();  
  
                if (info == null) {  
                    singleResult.append("数据源ID ").append(id).append(" 未找到\n");  
                    return singleResult.toString();  
                }  
                try {  
                    // 构建 URL                    UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(info.getUrl());  
  
                    // 获取当前数据源的参数  
                    HashMap<String, Object> paramsForThisSource = inputParams.getOrDefault(id, new HashMap<>());  
  
                    // 添加匹配参数  
                    if (info.getParams() != null) {  
                        for (String key : info.getParams().keySet()) {  
                            if (paramsForThisSource.containsKey(key)) {  
                                builder.queryParam(key, paramsForThisSource.get(key));  
                            }                        }                    }  
                    String finalUrl = builder.toUriString();  
                    log.info("请求数据源 [{}]: {}", id, finalUrl);  
  
                    String response = restTemplate.getForObject(finalUrl, String.class);  
                    singleResult.append("数据源 ID: ").append(id).append(" 请求成功\n");  
                    singleResult.append("返回内容:\n").append(response).append("\n\n");  
                    log.info("请求数据源 [{}] 成功,返回内容:{}", id, response);  
                } catch (Exception e) {  
                    singleResult.append("数据源 ID: ").append(id)  
                            .append(" 请求失败,错误信息: ").append(e.getMessage()).append("\n\n");  
                    log.error("请求数据源 [{}] 异常:{}", id, e.getMessage(), e);  
                }  
                return singleResult.toString();  
            });  
            futures.add(future);  
        }  
        // 等待所有任务完成并合并结果  
        StringBuilder result = new StringBuilder();  
        for (Future<String> future : futures) {  
            try {  
                result.append(future.get());  
            } catch (Exception e) {  
                log.error("获取任务结果失败:{}", e.getMessage(), e);  
                result.append("获取任务结果失败:").append(e.getMessage()).append("\n\n");  
            }        }  
        log.info("获取数据源结果:\n{}", result.toString());  
        return result.toString();  
    }  
}
#聊聊我眼中的AI##后端##互联网##后端校招##java#
全部评论

相关推荐

评论
1
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务