商品中心—18.库存分桶的一致性改造文档
大纲
1.分布式库存扣减时序图和流程图
2.库存入桶分配改造
3.库存分桶上线改造
4.库存分桶扩容改造
5.库存分桶下线改造
6.执行库存分桶缓存操作的定时任务
7.分桶操作之初始化分配库存的处理策略
8.分桶操作之增加库存与分桶上线的处理策略
9.分桶操作之分桶扩容的处理策略
10.分桶操作之分桶下线的处理策略
11.清除执⾏成功的分桶操作的定时任务
1.分布式库存扣减时序图和流程图
(1)分布式库存扣减时序图
(2)分布式库存扣减流程图
(3)数据库设计
(1)分布式库存扣减时序图
(2)分布式库存扣减流程图
(3)数据库设计
一.库存分桶操作表
CREATE TABLE `inventory_bucket_operate` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `operate_id` varchar(32) NOT NULL COMMENT '操作id', `seller_id` varchar(64) NOT NULL COMMENT '卖家id', `sku_id` varchar(64) NOT NULL COMMENT '商品sku', `operate_type` tinyint(3) NOT NULL COMMENT '操作类型:1-初始化,2-增加库存,3-分桶上线,4-分桶扩容,5-分桶下线', `bucket` text COMMENT '分桶变动信息', `inventory_num` int(11) DEFAULT NULL COMMENT '变动库存', `feature` text COMMENT '扩展信息', `operate_status` tinyint(4) DEFAULT '0' COMMENT '操作状态', `del_flag` tinyint(1) DEFAULT '1' COMMENT '删除标记', `create_user` int(11) DEFAULT NULL COMMENT '创建⼈', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_user` int(11) DEFAULT NULL COMMENT '更新⼈', `update_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=58 DEFAULT CHARSET=utf8mb4 COMMENT='库存分桶操作表';
二.库存操作失败记录表
CREATE TABLE `inventory_operate_fail` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `operate_id` varchar(32) NOT NULL COMMENT '操作id', `fail_type` varchar(32) DEFAULT NULL COMMENT '操作类型', `bucket_no` varchar(32) DEFAULT NULL COMMENT '分桶编号', `inventory_num` int(11) DEFAULT NULL COMMENT '变动库存数量', `del_flag` tinyint(1) DEFAULT NULL COMMENT '删除标识', `create_user` int(11) DEFAULT NULL COMMENT '创建⼈', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_user` int(11) DEFAULT NULL COMMENT '更新⼈', `update_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存操作失败记录表';
三.库存分桶配置表
CREATE TABLE `inventory_bucket_config` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `bucket_num` int(10) NOT NULL DEFAULT '0' COMMENT '分桶数量', `max_depth_num` int(10) NOT NULL DEFAULT '0' COMMENT '最⼤库存深度,即分桶的最大库存容量', `min_depth_num` int(10) NOT NULL DEFAULT '0' COMMENT '最⼩库存深度,即分桶的最小库存容量', `threshold_value` int(10) NOT NULL DEFAULT '0' COMMENT '分桶下线阈值,当某个分桶的库存数小于阈值时就需要将该分桶下线了', `back_source_proportion` int(10) NOT NULL DEFAULT '0' COMMENT '回源⽐例,从1-100设定⽐例', `back_source_step` int(10) NOT NULL DEFAULT '0' COMMENT '回源步⻓,桶扩容的时候默认每次分配的库存⼤⼩', `template_name` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '模板名称', `is_default` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否默认模板,只允许⼀个,1为默认模板', `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号', `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)', `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间', `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存分桶配置模板表';
四.库存分配记录表
CREATE TABLE `inventory_allot_detail` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId', `inventor_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '库存申请业务编号', `seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID', `inventor_num` int(10) NOT NULL DEFAULT '0' COMMENT '库存变更数量', `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号', `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)', `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间', `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `inde_unique_inventor_no` (`inventor_no`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=265 DEFAULT CHARSET=utf8 COMMENT='库存分配记录表';
五.库存扣减明细表
CREATE TABLE `inventory_deduction_detail` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `order_id` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '订单id', `refund_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '退款编号', `inventory_num` int(10) NOT NULL DEFAULT '0' COMMENT '扣减库存数量', `sku_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品skuId', `seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID', `bucket_no` int(10) NOT NULL COMMENT '扣减分桶编号', `deduction_type` int(2) NOT NULL COMMENT '库存操作类型(10库存扣减,20库存退货)', `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)', `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间', `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存扣减明细表';
2.库存入桶分配改造
(1)库存分桶初始化入口
(2)库存分桶元数据计算以及分桶编号创建
(3)库存分桶操作记录写DB + 中心桶库存写缓存
(4)库存分桶元数据缓存更新使用自缓存一致性服务的DB + 消息双写方案
(5)库存分桶元数据作为热点数据更新到各机器节点的本地缓存
这里主要进行的是库存分桶初始化分配库存的改造。其中的缓存一致性方案是:首先将分桶元数据写入库存分桶操作表,然后再由定时任务扫描处理。
计算出元数据(待上线分桶、中⼼桶剩余库存、每个分桶分配库存)信息后,为了保证⼀致性,会先将计算出的分桶元数据信息⼊库。也就是先写⼊库存分桶操作表,然后在缓存中写⼊中⼼桶剩余库存信息。如果⼊库失败或缓存写⼊失败,会抛出异常,数据库回滚,操作不成功。只有⼊库成功和缓存写⼊成功之后,本次操作才成功。
关键环节与核心代码:
(1)库存分桶初始化入口
@RestController @RequestMapping("/product/inventory") public class InventoryController { @Autowired private InventoryBucketService inventoryBucketService; @Autowired private InventoryBucketCache cache; @Resource private TairCache tairCache; ... //初始化库存 @RequestMapping("/init") public void inventoryInit(@RequestBody InventorBucketRequest request) { //清除本地缓存数据,cache.getCache()获取的是Guava Cache cache.getCache().invalidateAll(); //清除Tair中的数据,扫描卖家ID+SKU的ID的key会比较耗时 Set<String> keys = tairCache.getJedis().keys("*" + request.getSellerId() + request.getSkuId() + "*"); if (!CollectionUtils.isEmpty(keys)) { tairCache.mdelete(Lists.newArrayList(keys)); } //这里模拟指定本次的库存业务单号,实际接口需要外部传入 request.setInventorCode(SnowflakeIdWorker.getCode()); //初始化库存信息 inventoryBucketService.inventorBucket(request); } ... } //库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { @Resource private TairLock tairLock; ... //商品库存入桶分配 @Override @Transactional(rollbackFor = Exception.class) public void inventorBucket(InventorBucketRequest request) { //1.验证入参必填项 checkInventorParams(request); //锁key = 卖家ID + SKU的ID String key = buildBucketLockKey(request.getSellerId(), request.getSkuId()); String value = SnowflakeIdWorker.getCode(); //注意这里需要锁定中心桶库存 boolean lock = tairLock.tryLock(key, value); //分配库存时,这个卖家的sku是不允许其他相关操作的 if (lock) { try { //2.插入库存入库的记录信息 //由于申请的库存业务编号是一个唯一key,所以可以避免重复请求 //也就是会校验库存单号是否已经存在了,保证⼀次库存变更⾏为只能执⾏⼀次 inventoryRepository.saveInventoryAllotDetail(request); //3.将库存数据写入缓存 inventoryBucketCache(request); } catch (Exception e) { e.printStackTrace(); } finally { tairLock.unlock(key, value); } } else { throw new BaseBizException("请求繁忙,稍后重试!"); } } //将库存数据写入缓存 private void inventoryBucketCache(InventorBucketRequest request) { //获取中心桶库存的key String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId()); //1.先验证是否已缓存分桶元数据信息 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId()); try { //缓存不存在,则进行初始化操作 if (Objects.isNull(bucketLocalCache)) { //2.获取库存分桶的配置模板 InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId()); //初始化分桶库存 initInventoryBucket(request, inventoryBucketConfig); } else { //3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存 Integer residueNum = tairCache.incr(key, request.getInventoryNum()); if (residueNum < 0) { throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } //4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去) InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request); //5.构建新的分桶元数据信息,并写入 writeBucketCache(onlineRequest, residueNum); } } catch (Exception e) { log.error("分桶库存初始化出现失败", e); throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } finally { inventoryBucketCache.threadLocalRemove(); } } ... }
(2)库存分桶元数据计算以及分桶编号创建
//库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { @Resource private TairLock tairLock; @Resource private InventoryRepository inventoryRepository; @Resource private InventoryBucketCache inventoryBucketCache; @Resource private TairCache tairCache; ... //初始化分桶库存 private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) { //计算出分桶的元数据信息 BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig); //库存分桶的元数据信息入库 inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.INIT.getCode(), bucketLocalCache.getAvailableList(), null); //写入中心桶的剩余库存信息 log.info("中心桶剩余库存:{}", bucketLocalCache.getResidueNum()); //获取中心桶剩余库存的key String key = buildSellerInventoryKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); //设置中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量 boolean setFlag = tairCache.set(key, bucketLocalCache.getResidueNum().toString(), 0); if (!setFlag) { //中心桶剩余库存写入失败,回滚事务 throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } } //计算出本次库存入库的具体分桶信息 private BucketLocalCache calcInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) { //分桶配置模版中默认的分桶数量 Integer bucketNum = inventoryBucketConfig.getBucketNum(); //获取本次需要入库的库存数量 Integer inventoryNum = request.getInventoryNum(); //配置模版中所有分桶的最大库存容量 Integer maxBucketNum = bucketNum * inventoryBucketConfig.getMaxDepthNum(); //配置模版中所有分桶的最小库存容量 //如果需要放入分桶的库存数量低于这个值,那么只会分配给部分分桶,此时就需要重新计算分桶 Integer minBucketNum = bucketNum * inventoryBucketConfig.getMinDepthNum(); //本次最多可以放入分桶的库存数量 int countBucketNum = Math.min(inventoryNum, maxBucketNum); //当库存数量小于最小分桶深度*分桶数量,就需要减少分配的分桶数 //此时要分配的分桶数量 bucketNum = 本次库存入库的数量 / 每个分桶的最小库存容量 if (minBucketNum > countBucketNum) { bucketNum = countBucketNum / inventoryBucketConfig.getMinDepthNum(); //如果库存数量不足一个分桶的最小深度,但是大于0,则上线一个分桶 if (bucketNum == 0 && countBucketNum % inventoryBucketConfig.getMinDepthNum() > 0) { bucketNum++; } } //获取每个分桶分配的库存数量 Integer bucketInventoryNum = countBucketNum / bucketNum; //剩余库存数量,可能为0或者大于0,补到最后一个分桶上 Integer residueNum = countBucketNum - bucketInventoryNum * bucketNum; //构建缓存数据模型时,以卖家ID + 商品skuId为唯一标识 String key = request.getSellerId() + request.getSkuId(); //构建具体的缓存数据模型 BucketLocalCache bucketLocalCache = buildBucketCache(key, bucketNum, bucketInventoryNum, residueNum, inventoryBucketConfig); //标记到具体的数据上 bucketLocalCache.setSellerId(request.getSellerId()); bucketLocalCache.setSkuId(request.getSkuId()); bucketLocalCache.setInventoryNum(inventoryNum); //中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量 bucketLocalCache.setResidueNum(inventoryNum - countBucketNum); bucketLocalCache.setInventoryBucketConfig(inventoryBucketConfig); bucketLocalCache.setOperationType(BucketStatusEnum.AVAILABLE_STATUS.getCode()); return bucketLocalCache; } //构建缓存模型 //@param bucketNum 分桶数量 //@param inventoryNum 分桶分配的库存数量 //@param residueNum 剩余的未分配均匀的库存 //@param inventoryBucketConfig 分桶配置信息 private BucketLocalCache buildBucketCache(String key, Integer bucketNum, Integer inventoryNum, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) { BucketLocalCache bucketLocalCache = new BucketLocalCache(); //先获取得到这个模板配置的对应可分槽位的均匀桶列表 List<String> bucketNoList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum()); List<BucketCacheBO> bucketCacheBOList = new ArrayList<>(bucketNum); List<BucketCacheBO> undistributedList = new ArrayList<>(bucketNum); //构建出多个分桶对象 for (int i = 0; i < bucketNum; i++) { //生成对应的分桶编号,方便定义到具体的分桶上 BucketCacheBO bucketCache = new BucketCacheBO(); String bucketNo = bucketNoList.get(i); bucketCache.setBucketNo(bucketNo); //最后一个分桶,分配剩余未除尽的库存+平均库存 if (i == bucketNum - 1) { bucketCache.setBucketNum(inventoryNum + residueNum); } else { bucketCache.setBucketNum(inventoryNum); } bucketCacheBOList.add(bucketCache); } //生成的分桶对象超过实际可分配的分桶对象,保留这批多余的分桶模型为不可用分桶,后续分桶上线可以选择使用 if (bucketNoList.size() > bucketNum) { for (int i = bucketNum; i < bucketNoList.size(); i++) { BucketCacheBO bucketCache = new BucketCacheBO(); String bucketNo = bucketNoList.get(i); bucketCache.setBucketNo(bucketNo); undistributedList.add(bucketCache); } } //生成缓存的明细key List<String> bucketDetailKeyList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum(), "%07d"); //设置分桶缓存明细的key bucketLocalCache.setBucketDetailKeyList(bucketDetailKeyList); //设置可用的分桶缓存列表 bucketLocalCache.setAvailableList(bucketCacheBOList); //设置不可用或者已下线的分桶缓存列表 bucketLocalCache.setUndistributedList(undistributedList); return bucketLocalCache; } ... } public class InventorBucketUtil { private static final int MAX_SIZE = 100000; //生成对应的槽位key,默认的 //@param key 卖家Id+商品skuId //@param bucketNum 分桶配置数量 //@return 预先保留的槽位集合 public static List<String> createBucketNoList(String key, Integer bucketNum) { return createBucketNoList(key, bucketNum, "%06d"); } //生成对应的槽位key,明细使用,多使用一位区分 //@param key 卖家Id+商品skuId //@param bucketNum 分桶配置数量 //@return 预先保留的槽位集合 public static List<String> createBucketNoList(String key, Integer bucketNum, String format) { Map<Long, String> cacheKey = new HashMap<>(bucketNum); //bucketNoList用来存放每个桶对应的hashKey List<String> bucketNoList = new ArrayList<>(bucketNum); //分配桶的编号 for (int i = 1; i <= MAX_SIZE; i++) { String serialNum = String.format(format, i); //卖家ID + 商品SKU ID + 序号 String hashKey = key + serialNum; //一致性哈希算法murmur long hash = HashUtil.murMurHash(hashKey.getBytes()); //对分桶数量进行取模运算 long c = (hash %= bucketNum); //确保被选中的hashKey都能哈希到不同的分桶 if (cacheKey.containsKey(c)) { continue; } cacheKey.put(c, hashKey); bucketNoList.add(hashKey); if (cacheKey.size() >= bucketNum) { break; } } return bucketNoList; } ... }
(3)库存分桶操作记录写DB + 中心桶库存写缓存
//库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { @Resource private TairLock tairLock; @Resource private InventoryRepository inventoryRepository; @Resource private InventoryBucketCache inventoryBucketCache; @Resource private TairCache tairCache; ... //初始化分桶库存 private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) { //计算出分桶的元数据信息 BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig); //库存分桶的元数据信息入库 inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.INIT.getCode(), bucketLocalCache.getAvailableList(), null); //写入中心桶的剩余库存信息 log.info("中心桶剩余库存:{}", bucketLocalCache.getResidueNum()); //获取中心桶剩余库存的key String key = buildSellerInventoryKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); //设置中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量 boolean setFlag = tairCache.set(key, bucketLocalCache.getResidueNum().toString(), 0); if (!setFlag) { //中心桶剩余库存写入失败,回滚事务 throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } } ... } @Repository public class InventoryRepository { ... //保存库存分桶的元数据信息 //@param operateId 操作id //@param bucketLocalCache 变更之后的元数据信息 //@param operateType 操作类型 //@param bucketCacheBOList 变动的分桶列表 //@param inventoryNum 变动的库存数量 public void saveBucketDetail(String operateId, BucketLocalCache bucketLocalCache, Integer operateType, List<BucketCacheBO> bucketCacheBOList, Integer inventoryNum) { //变动的分桶为空,则不必要保存 if (CollectionUtils.isEmpty(bucketCacheBOList)) { return; } if (!StringUtils.hasLength(operateId)) { operateId = SnowflakeIdWorker.getCode(); } InventoryBucketOperateDO inventoryBucketOperateDO = InventoryBucketOperateDO.builder() .operateId(operateId) .sellerId(bucketLocalCache.getSellerId()) .skuId(bucketLocalCache.getSkuId()) .bucket(JSON.toJSONString(bucketCacheBOList)) .operateType(operateType) .feature(JSON.toJSONString(bucketLocalCache)) .inventoryNum(inventoryNum) .build(); int count = inventoryBucketOperateMapper.insert(inventoryBucketOperateDO); if (count <= 0) { throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL); } } ... }
(4)库存分桶元数据缓存更新使用自缓存一致性服务的DB + 消息双写方案
InventoryBucketCache.setBucketLocalCache()方法设置库存分桶元数据到本地缓存时,就用到了缓存一致性框架。也就是使用了缓存一致性服务的@CacheRefresh注解,因为库存分桶元数据属于热点数据,对实时性要求比较高。在一台机器的本地缓存了库存分桶元数据后,其他机器也应缓存该数据。
@Component @Data public class InventoryBucketCache { @Autowired private Cache cache; @Resource private TairCache tairCache; private ThreadLocal<BucketLocalCache> bucketLocalCacheThreadLocal = new ThreadLocal<>(); //本地存储关于分桶信息 @CacheRefresh( cacheKey = "bucketKey", mqCacheKey = CacheConstant.INVENTORY_SKU_KEY, index = "1", messageType = CacheConstant.MESSAGE_TYPE_HOT, cacheType = CacheConstant.TAIR_CACHE_TYPE ) public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) { String bucketLocalKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId(); synchronized (bucketLocalKey.intern()) { log.info("保存本地缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache)); BucketLocalCache bucketCache = getTairBucketCache(bucketKey); log.info("远程缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketCache)); //如果本地缓存没有就直接写入 if (Objects.isNull(bucketCache)) { setBucketCache(bucketKey, bucketLocalCache); cache.put(bucketKey, bucketLocalCache); return; } //本地缓存的元数据覆盖,考虑到是并发执行的,这里需要上内存级别的锁,并进行diff处理 if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.AVAILABLE_STATUS.getCode())) { diffCacheOnline(bucketCache, bucketLocalCache); } else if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.OFFLINE_STATUS.getCode())) { diffCacheOffline(bucketCache, bucketLocalCache); } setBucketCache(bucketKey, bucketCache); cache.put(bucketKey, bucketCache); log.info("实际保存本地缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketCache)); } } ... } //刷新缓存的自定义注解 @Aspect @Component public class CacheRefreshAspect { @Autowired private DataRefreshProducer producer; @Autowired private CacheRefreshConverter cacheRefreshConverter; @Autowired private CacheQueue cacheQueue; //切入点,@CacheRefresh注解标注的 @Pointcut("@annotation(com.demo.eshop.cache.annotation.CacheRefresh)") public void pointcut() { } //环绕通知,在方法执行前后 //@param point 切入点 //@return 结果 @Around("pointcut() && @annotation(cacheRefresh)") public Object around(ProceedingJoinPoint point, CacheRefresh cacheRefresh) throws Throwable { //签名信息 Signature signature = point.getSignature(); //强转为方法信息 MethodSignature methodSignature = (MethodSignature) signature; //参数名称 String[] parameterNames = methodSignature.getParameterNames(); //参数值 Object[] parameterValues = point.getArgs(); Object response; try { //先执行本地方法再执行异步的操作 response = point.proceed(); } catch (Throwable throwable) { log.error("执行方法: {}失败,异常信息: {}", methodSignature.getMethod().getName(), throwable); throw throwable; } try { MessageCache messageCache = new MessageCache(); for (int i = 0; i < parameterValues.length; i++) { if (parameterNames[i].equals(cacheRefresh.cacheKey())) { messageCache.setCacheKey(String.valueOf(parameterValues[i])); } if (Integer.valueOf(cacheRefresh.index()) == i) { messageCache.setCacheJSON(JSONObject.toJSONString(parameterValues[i])); } } messageCache.setOperationType(Integer.valueOf(cacheRefresh.operationType())); //给定一个有序的版本号(默认统一的工作ID和数据中心ID) messageCache.setVersion(SnowflakeIdWorker.getVersion()); messageCache.setMessageType(Integer.valueOf(cacheRefresh.messageType())); messageCache.setCacheType(Integer.valueOf(cacheRefresh.cacheType())); messageCache.setCreateDate(new Date()); //将缓存数据写入读写队列 //缓存数据写入读写队列后,会定时每秒批量写入数据库(缓存数据写入DB只用于兜底,所以偶尔出现丢失并不影响) DataRefreshDetailDO dataRefreshDetailDO = cacheRefreshConverter.converter(messageCache); cacheQueue.submit(dataRefreshDetailDO); //发送MQ消息去处理缓存数据,比如将缓存数据更新到缓存上 //一般来说,热点缓存会比普通缓存少很多,所以普通缓存的更新会比较多,热点缓存的更新会比较少 //此外,热点缓存的更新会对时效性要求比较高,通过消息去异步处理本来就已存在一定的延迟 //所以这里将普通缓存和热点缓存的更新进行分开处理,减少时效性高的消息的处理延迟 if (CacheConstant.MESSAGE_TYPE_HOT.equals(cacheRefresh.messageType())) { producer.sendMessage(RocketMqConstant.DATA_HOT_RADIO_TOPIC, JSONObject.toJSONString(messageCache), "热点缓存消息发送"); } else { producer.sendMessage(RocketMqConstant.DATA_MESSAGE_CACHE_TOPIC, JSONObject.toJSONString(messageCache), cacheRefresh.mqCacheKey(), "通用缓存消息发送"); } } catch (Exception e) { log.error("处理缓存同步:{}失败,异常信息:{}", methodSignature.getMethod().getName(), e); } return response; } }
(5)库存分桶元数据作为热点数据更新到各机器节点的本地缓存
setBucketLocalCache()方法的@CacheRefresh注解描述缓存是热点类型。该方法被执行后,会被AOP切面切入,将需要缓存的数据发送到MQ。接着MQ会对这种热点类型的消息进行广播处理,也就是每台机器都会执行CacheRefreshListener的方法。
@Configuration public class ConsumerBeanConfig { ... //刷新本地缓存 @Bean("cacheRefresTopic") public DefaultMQPushConsumer cacheRefresTopic(CacheRefreshListener cacheRefreshListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.REFRESH_CACHE_GROUP); //设置为广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.DATA_HOT_RADIO_TOPIC, "*"); consumer.registerMessageListener(cacheRefreshListener); consumer.start(); return consumer; } ... } //刷新分布式节点的本地缓存 @Component public class CacheRefreshListener implements MessageListenerConcurrently { //本地缓存 @Autowired private Cache cache; @Resource private InventoryBucketCache inventoryBucketCache; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String msg = new String(messageExt.getBody()); log.info("刷新本地缓存,消息内容:{},消费时间:{}", msg, DateFormatUtil.formatDateTime(new Date())); MessageCache messageCache = JsonUtil.json2Object(msg, MessageCache.class); BucketLocalCache bucketLocalCache = JsonUtil.json2Object(messageCache.getCacheJSON(), BucketLocalCache.class); synchronized (messageCache.getCacheKey().intern()) { String bucketLocalKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId(); //获取远程缓存的分桶元数据信息 BucketLocalCache bucketCache = inventoryBucketCache.getTairBucketCache(bucketLocalKey); if (Objects.isNull(bucketCache)) { cache.put(messageCache.getCacheKey(), JsonUtil.json2Object(messageCache.getCacheJSON(), BucketLocalCache.class)); log.info("更新本地缓存,本次更新内容:{},更新时间:{}", messageCache.getCacheJSON(), DateFormatUtil.formatDateTime(new Date())); } else { //进行diff数据处理 if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.AVAILABLE_STATUS.getCode())) { inventoryBucketCache.diffCacheOnline(bucketCache, bucketLocalCache); } else if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.OFFLINE_STATUS.getCode())) { inventoryBucketCache.diffCacheOffline(bucketCache, bucketLocalCache); } cache.put(messageCache.getCacheKey(), bucketCache); log.info("更新本地缓存,本次更新内容:{},更新时间:{}", JsonUtil.object2Json(bucketCache), DateFormatUtil.formatDateTime(new Date())); } } } } catch (Exception e) { //本地缓存只有参数转换会出错,这种错误重试也没什么作用 log.error("consume error, 刷新本地缓存失败", e); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
3.库存分桶上线改造
当向库存分桶增加库存时,会调用分桶上线接⼝,也就是会调⽤InventoryBucketServiceImpl的writeBucketCache()⽅法,writeBucketCache()⽅法会实现具体的分桶上线任务。
InventoryBucketServiceImpl的bucketOnline()方法,适⽤场景是在商品库存⼊桶时,分桶上线中存在上线失败的分桶。此时运营⼈员就可以通过bucketOnline()方法⼿动执⾏分桶的上线。从而防⽌上线的分桶过少,承担的并发压⼒⼤。
其中的缓存一致性方案是:首先将分桶元数据写入库存分桶操作表,然后再由定时任务扫描处理。
也是先把分桶元数据写⼊到数据库中,然后再操作中⼼桶缓存数据。数据库写⼊成功和缓存写⼊成功,则本次操作成功。数据库写⼊失败或者缓存写⼊失败,都会回滚数据库,本次操作失败。
//库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { @Resource private TairLock tairLock; @Resource private InventoryRepository inventoryRepository; @Resource private InventoryBucketCache inventoryBucketCache; @Resource private TairCache tairCache; ... //商品库存入桶分配 @Override @Transactional(rollbackFor = Exception.class) public void inventorBucket(InventorBucketRequest request) { //1.验证入参必填项 checkInventorParams(request); //锁key = 卖家ID + SKU的ID String key = buildBucketLockKey(request.getSellerId(), request.getSkuId()); String value = SnowflakeIdWorker.getCode(); //注意这里需要锁定中心桶库存 boolean lock = tairLock.tryLock(key, value); //分配库存时,这个卖家的sku是不允许其他相关操作的 if (lock) { try { //2.插入库存入库的记录信息 //由于申请的库存业务编号是一个唯一key,所以可以避免重复请求 //也就是会校验库存单号是否已经存在了,保证⼀次库存变更⾏为只能执⾏⼀次 inventoryRepository.saveInventoryAllotDetail(request); //3.将库存数据写入缓存 inventoryBucketCache(request); } catch (Exception e) { e.printStackTrace(); } finally { tairLock.unlock(key, value); } } else { throw new BaseBizException("请求繁忙,稍后重试!"); } } //将库存数据写入缓存 private void inventoryBucketCache(InventorBucketRequest request) { //获取中心桶库存的key String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId()); //1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId()); try { //缓存不存在,则进行初始化操作 if (Objects.isNull(bucketLocalCache)) { //2.获取库存分桶的配置模板 InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId()); //初始化分桶库存 initInventoryBucket(request, inventoryBucketConfig); } else { //3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存 Integer residueNum = tairCache.incr(key, request.getInventoryNum()); if (residueNum < 0) { throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } //4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去) InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request); //5.构建新的分桶元数据信息,并写入 writeBucketCache(onlineRequest, residueNum); } } catch (Exception e) { log.error("分桶库存初始化出现失败", e); throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } finally { inventoryBucketCache.threadLocalRemove(); } } //分桶上线接口 @Override @Transactional(rollbackFor = Exception.class) public void bucketOnline(InventorOnlineRequest request) { //1.验证入参必填 checkInventorOnlineParams(request); //2.注意这里需要锁定中心桶库存 String key = buildBucketLockKey(request.getSellerId(), request.getSkuId()); String value = SnowflakeIdWorker.getCode(); boolean lock = tairLock.tryLock(key, value); if (lock) { try { //3.获取中心桶的库存,并校验是否可上线分桶 Integer residueNum = checkBucketOnlineNum(key); //4.构建新的分桶元数据信息,并写入 writeBucketCache(request, residueNum); } catch (Exception e) { e.printStackTrace(); } finally { tairLock.unlock(key, value); } } else { throw new BaseBizException("请求繁忙,稍后重试!"); } } ... //构建新的分桶元数据信息 //@param request 分桶上线对象 //@param residueNum 中心桶剩余库存 private void writeBucketCache(InventorOnlineRequest request, Integer residueNum) { String key = request.getSellerId() + request.getSkuId(); //获取到本地的缓存列表 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key); try { if (!Objects.isNull(bucketLocalCache)) { //获取当前可上线的分桶列表信息 List<BucketCacheBO> bucketCacheBOList = buildBucketList(request.getBucketNoList(), bucketLocalCache.getAvailableList(), bucketLocalCache.getUndistributedList(), bucketLocalCache.getInventoryBucketConfig(), residueNum); //当前可上线的分桶为空,直接返回 if (CollectionUtils.isEmpty(bucketCacheBOList)) { return; } //中心桶被扣减掉的库存(上线的分桶库存总和) Integer descInventoryNum = bucketCacheBOList.stream().mapToInt(BucketCacheBO::getBucketNum).sum(); //构建返回新的元数据模型返回 buildBucketLocalCache(bucketLocalCache, bucketCacheBOList, residueNum - descInventoryNum); //分桶信息入库 inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.ONLINE.getCode(), bucketCacheBOList, descInventoryNum); //扣减中心桶剩余库存,如果扣减失败了,直接抛异常 Integer decr = tairCache.decr(buildSellerInventoryKey(request.getSellerId(), request.getSkuId()), descInventoryNum); if (decr < 0) { throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } } } catch (Exception e) { log.error("分桶构建初始化失败", e); throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } finally { inventoryBucketCache.threadLocalRemove(); } } //获取可上线的分桶列表信息以及具体上线库存 //@param bucketNoList 上线分桶编号列表 //@param availableList 上线正在使用的分桶编号列表 //@param undistributedList 下线或者未使用的分桶编号列表 //@param inventoryBucketConfigDO 当前分桶的配置模板信息 //@param residueNum 中心桶的剩余可分配库存 //@return 可上线的分桶列表以及具体分桶库存 private List<BucketCacheBO> buildBucketList(List<String> bucketNoList, List<BucketCacheBO> availableList, List<BucketCacheBO> undistributedList, InventoryBucketConfigDO inventoryBucketConfigDO, Integer residueNum) { //1.如果入参选择了上线的分桶编号列表,则从缓存中配置的未使用分桶列表进行比对处理 List<String> bucketCacheList = null; if (!CollectionUtils.isEmpty(bucketNoList)) { Map<String, BucketCacheBO> bucketCacheMap = undistributedList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity())); //过滤返回可用的分桶编号 bucketCacheList = bucketNoList.stream().filter(bucketNo -> bucketCacheMap.containsKey(bucketNo)).collect(Collectors.toList()); } else { //直接返回下线的不可用分桶列表 bucketCacheList = undistributedList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList()); } //可上线的分桶列表为空 if (CollectionUtils.isEmpty(bucketCacheList)) { return Lists.newArrayList(); } //2.根据中心桶的可分配库存,处理返回具体上线的分桶配置信息 return calcOnlineBucket(availableList, bucketCacheList, residueNum, inventoryBucketConfigDO); } //构建新的元数据模型 //@param bucketLocalCache 本地分桶元数据信息 //@param bucketCacheBOList 上线的分桶列表 //@param residueNum 中心桶剩余库存 private void buildBucketLocalCache(BucketLocalCache bucketLocalCache, List<BucketCacheBO> bucketCacheBOList, Integer residueNum) { //填充中心桶剩余库存 bucketLocalCache.setResidueNum(residueNum); //添加新上线的分桶列表 bucketLocalCache.getAvailableList().addAll(bucketCacheBOList); Map<String, BucketCacheBO> bucketCacheMap = bucketCacheBOList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity())); List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList().stream().filter(bucketCacheBO -> //在上线的分桶列表,需要移除掉 !bucketCacheMap.containsKey(bucketCacheBO.getBucketNo())).collect(Collectors.toList()); //从不可用的分桶列表重移除 bucketLocalCache.setUndistributedList(undistributedList); bucketLocalCache.setOperationType(BucketStatusEnum.AVAILABLE_STATUS.getCode()); } ... } @Repository public class InventoryRepository { ... //保存库存分桶的元数据信息 //@param operateId 操作id //@param bucketLocalCache 变更之后的元数据信息 //@param operateType 操作类型 //@param bucketCacheBOList 变动的分桶列表 //@param inventoryNum 变动的库存数量 public void saveBucketDetail(String operateId, BucketLocalCache bucketLocalCache, Integer operateType, List<BucketCacheBO> bucketCacheBOList, Integer inventoryNum) { //变动的分桶为空,则不必要保存 if (CollectionUtils.isEmpty(bucketCacheBOList)) { return; } if (!StringUtils.hasLength(operateId)) { operateId = SnowflakeIdWorker.getCode(); } InventoryBucketOperateDO inventoryBucketOperateDO = InventoryBucketOperateDO.builder() .operateId(operateId) .sellerId(bucketLocalCache.getSellerId()) .skuId(bucketLocalCache.getSkuId()) .bucket(JSON.toJSONString(bucketCacheBOList)) .operateType(operateType) .feature(JSON.toJSONString(bucketLocalCache)) .inventoryNum(inventoryNum) .build(); int count = inventoryBucketOperateMapper.insert(inventoryBucketOperateDO); if (count <= 0) { throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL); } } ... }
4.库存分桶扩容改造
在库存扣减接⼝执⾏完后,会检查分桶库存是否需要扩容。当分桶剩余库存⼩于分桶回源数量时,会执行扩容接⼝,即调⽤InventoryBucketServiceImpl的writeBucketCache()⽅法。其中,分桶回源数量是通过库存分桶配置模板表中的回源⽐例计算出的。
其中的缓存一致性方案是:首先将分桶元数据写入库存分桶操作表,然后再由定时任务扫描处理。
首先会判断当前要扩容的分桶是否已有存在的并且扩容失败的标识。为了避免⼀个分桶多次扩容失败频繁锁定中⼼桶的库存,所以当失败次数超过两次之后,剩下的扩容请求就直接拒绝掉。等待扩容的分桶后续流程执⾏完成后,再将失败的次数减少,从而防⽌后续的扩容请求⽆法正常处理。
因为扩容对实时性要求⽐较⾼,所以在处理扩容请求时,会直接尝试执⾏扩容流程。也就是首先尝试扣减中⼼桶,如果中⼼桶因为库存不足扣减失败,那么就直接返回。如果中⼼桶扣减成功,那么才继续执⾏后续分桶扩容操作。
只有在扩容失败时,才会将失败的扩容存储在数据库中,等待后续定时任务扫描兜底处理。
//库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { @Resource private TairLock tairLock; @Resource private InventoryRepository inventoryRepository; @Resource private InventoryBucketCache inventoryBucketCache; @Resource private TairCache tairCache; ... //分桶扩容接口 @Override public void bucketCapacity(BucketCapacity bucketCapacity) { long startTime = System.currentTimeMillis(); //获取中心桶的剩余库存 Integer residueNum = getCenterStock(bucketCapacity); if (residueNum <= 0) { //中心桶无剩余库存,检查是否触发下线 checkBucketOffline(bucketCapacity); return; } //判断本次扩容的分桶,是否有多次扩容失败的情况 String failNum = tairCache.get(TairInventoryConstant.BUCKET_CAPACITY_FAIL + bucketCapacity.getBucketNo()); if (StringUtils.isNotBlank(failNum) && Integer.parseInt(failNum) >= 2) { //当前分桶扩容失败次数超过两次了,直接放弃这次扩容 //因为失败太多并且还继续去尝试,会持续的扣减中心桶库存,可能会导致其他可以正常扩容的分桶,没有中心桶库存可以扣减 return; } //1.校验是否已经无需扩容了,如果是则快速结束 BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity); if (!bucketCapacityContext.getIsCapacity()) { return; } //先锁住中心桶库存,避免此时库存发生变化 String key = buildBucketLockKey(bucketCapacity.getSellerId(), bucketCapacity.getSkuId()); String value = SnowflakeIdWorker.getCode(); //获取分布式锁来进行扩容处理 boolean lock = tairLock.tryLock(key, value); if (lock) { try { //再次校验是否需要扩容,此处不允许并发 bucketCapacityContext = checkBucketCapacity(bucketCapacity); if (bucketCapacityContext.getIsCapacity()) { //2.获取中心桶库存的库存 residueNum = getCenterStock(bucketCapacity); //3.可以扩容,计算出可回源的库存进行处理 if (residueNum > 0) { backSourceInventory(residueNum, bucketCapacityContext); log.info(bucketCapacity.getBucketNo() + "处理扩容消耗时间{}", System.currentTimeMillis() - startTime); } else { //4.中心桶无库存,检查是否触发下线 checkBucketOffline(bucketCapacity); } } } catch (Exception e) { e.printStackTrace(); } finally { tairLock.unlock(key, value); } } else { throw new BaseBizException("请求繁忙,稍后重试!"); } } //回源库存到分桶上 //@param residueNum 中心桶剩余库存 //@param bucketCapacityContext 扩容上下文对象 private void backSourceInventory(Integer residueNum, BucketCapacityContext bucketCapacityContext) { BucketCapacity bucketCapacity = bucketCapacityContext.getBucketCapacity(); //先获取本地的分桶元数据信息,获取当前分桶的总发放上限 String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId(); //中心桶库存key String sellerInventoryKey = buildSellerInventoryKey(bucketCapacity.getSellerId(), bucketCapacity.getSkuId()); BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key); String bucketNo = ""; try { InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig(); List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList(); Integer inventoryNum = 0; //获取实际配置的最大可用库存深度 Integer maxBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum(); BucketCacheBO bucketCache = null; for (BucketCacheBO bucketCacheBO : availableList) { if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) { bucketCache = bucketCacheBO; bucketNo = bucketCache.getBucketNo(); break; } } //这里没有匹配到分桶,则该分桶已被下线,不处理后续流程 if (Objects.isNull(bucketCache)) { return; } //中心桶库存超过最大深度库存(全部分桶总计),直接以配置的回源步长增长库存 if (residueNum > maxBucketNum) { inventoryNum = inventoryBucketConfig.getBackSourceStep(); } else { inventoryNum = calcEvenInventoryNum(maxBucketNum, inventoryBucketConfig, residueNum, bucketCache); } //获取扩容后的预估库存深度 Integer maxDepthNum = getMaxDepthNum(inventoryNum, inventoryBucketConfig, bucketCache, bucketCapacityContext); //更新分桶元数据相关信息 refreshBucketCache(maxDepthNum, bucketLocalCache, bucketCapacity.getBucketNo(), inventoryNum); //扣减中心桶库存 Integer decr = tairCache.decr(sellerInventoryKey, inventoryNum); if (decr < 0) { //中心桶扣减失败,直接返回 throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } //回源分桶的库存 Integer incr = tairCache.retryIncr(bucketCache.getBucketNo(), inventoryNum, 3); if (incr < 0) { //这里扣减中心桶成功了,但是回源分桶库存失败了,记录失败的操作,重试,如果重试失败了,则库存返回中心桶 //本次操作的唯一id String operateId = SnowflakeIdWorker.getCode(); //分桶元数据信息入库 inventoryRepository.saveBucketDetail(operateId, bucketLocalCache, BucketOperateEnum.CAPACITY.getCode(), Lists.newArrayList(bucketCache), inventoryNum); //考虑到这里同一个分桶,可能会有多个任务没处理完,value值需要用对象来存储 inventoryRepository.saveFailOperate(operateId, TairInventoryConstant.OPERATE_INCR_FAIL, bucketCache.getBucketNo(), inventoryNum); tairCache.appendJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, bucketCache.getBucketNo(), operateId, inventoryNum); throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } log.info("本次分桶:{},回源库存:{}, 回源后分桶库存:{}, 中心桶剩余库存:{}", bucketCache.getBucketNo(), inventoryNum, incr, decr); } catch (Exception e) { //增加分桶扩容失败次数,如果次数超过两次了,则不会尝试更新缓存,直接写DB,让DB去操作 //DB操作成功,会减少失败次数 tairCache.incr(TairInventoryConstant.BUCKET_CAPACITY_FAIL + bucketNo); throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } finally { inventoryBucketCache.threadLocalRemove(); } } //刷新分桶元数据缓存 //@param maxDepthNum 分桶最大库存深度 //@param bucketLocalCache 分桶元数据信息 //@param bucketNo 分桶编号 private void refreshBucketCache(Integer maxDepthNum, BucketLocalCache bucketLocalCache, String bucketNo, Integer inventoryNum) { List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList(); for (BucketCacheBO bucketCacheBO : availableList) { if (bucketCacheBO.getBucketNo().equals(bucketNo)) { //每次库存具体深度变化都要更细,否则很容易触发回源的比例 bucketCacheBO.setBucketNum(maxDepthNum); bucketCacheBO.setAllotNum(inventoryNum + (Objects.isNull(bucketCacheBO.getAllotNum()) ? 0 : bucketCacheBO.getAllotNum())); break; } } String key = buildBucketCacheKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); //刷新本地缓存 inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache); } ... }
5.库存分桶下线改造
在库存分桶扩容时,如果发现中心桶库存为0,那么会进行下线检查。如果分桶的剩余库存达到下线阈值,那么就会调⽤InventoryBucketServiceImpl的bucketOffline()⽅法进行分桶下线。
其中的缓存一致性方案是:首先将分桶元数据写入库存分桶操作表,然后再由定时任务扫描处理。
//库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { @Resource private TairLock tairLock; @Resource private InventoryRepository inventoryRepository; @Resource private InventoryBucketCache inventoryBucketCache; @Resource private TairCache tairCache; ... //分桶下线接口 @Override public void bucketOffline(InventorOfflineRequest request) { //1.验证入参必填 checkInventorOfflineParams(request); //过滤只有一个分桶的无效请求 Boolean isOffline = checkBucketOffline(request); if (isOffline) { long start = System.currentTimeMillis(); //2.注意这里需要锁定 下线分桶的变更,这个接口默认一次只有一个分桶 String key = buildBucketOfflineLockKey(request.getSellerId(), request.getSkuId(), request.getBucketNoList().get(0)); String value = SnowflakeIdWorker.getCode(); boolean lock = tairLock.tryLock(key, value); if (lock) { try { //3.先将准备下线的分桶库存从本地和远程列表中移除至不可用列表,避免新的请求进来 updateBucketCache(request); log.info("分桶下线处理时间,下线分桶:{}, 当前时间:{}, 耗时:{}", JSON.toJSONString(request.getBucketNoList()), DateFormatUtil.formatDateTime(new Date()), System.currentTimeMillis() - start); } catch (Exception e) { e.printStackTrace(); } finally { tairLock.unlock(key, value); } } else { throw new BaseBizException("请求繁忙,稍后重试!"); } } } //移除本地分桶的对应分桶列表以及远程的分桶列表 //@param request 下线的请求参数列表 private void updateBucketCache(InventorOfflineRequest request) { //下线的分桶列表 List<String> bucketCacheList = request.getBucketNoList(); String key = buildBucketCacheKey(request.getSellerId(), request.getSkuId()); //1.获取到本地的缓存列表 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId()); try { //2.填充下线的分桶到不可用列表中 List<BucketCacheBO> offlineBucket = bucketLocalCache.getAvailableList().stream().filter(bucketCacheBO -> bucketCacheList.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList()); if (CollectionUtils.isEmpty(offlineBucket)) { //如果下线的分桶过滤后,没有要下线的了,直接返回 return; } boolean isOfflineBucket = true; for (BucketCacheBO bucketCacheBO : offlineBucket) { if (!StringUtils.isEmpty(bucketCacheBO.getBucketNo())) { bucketLocalCache.getUndistributedList().add(bucketCacheBO); isOfflineBucket = false; } } //有合法的下线分桶,才操作 if (isOfflineBucket) { return; } //过滤返回,还上线的分桶列表 List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList().stream().filter(bucketCacheBO -> !bucketCacheList.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList()); bucketLocalCache.setAvailableList(availableList); //可用分桶列表大于等于1的时候,才允许分桶下线 if (availableList.size() >= 1) { //元数据缓存更新,这里会切面处理,为避免出现延迟,先操作远程缓存的元数据覆盖 bucketLocalCache.setOperationType(BucketStatusEnum.OFFLINE_STATUS.getCode()); //发一个消息进行本地缓存的更新,同时切面会发一个消息更新缓存,本地缓存的更新涉及版本号 inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache); //分桶元数据信息入库,待跑批任务执行 inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.OFFLINE.getCode(), offlineBucket, null); } } catch (Exception e) { log.error("分桶下线出现失败", e); throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } finally { inventoryBucketCache.threadLocalRemove(); } } ... }
6.执行库存分桶缓存操作的定时任务
(1)定时任务的入口
(2)定时任务的整体实现流程
(1)定时任务的入口
在库存入桶分配、分桶上线、分桶扩容、分桶下线四个接口中,都只更新了中⼼桶缓存,而分桶的缓存数据还没处理。
由于这四个接口都会将变更的分桶元数据存储到库存分桶操作表里,所以可以启动定时任务扫描库存分桶操作表中数据,更新分桶的缓存数据。
@Component public class BucketOperateJobHandler { @DubboReference(version = "1.0.0") private InventoryServiceApi inventoryService; @XxlJob("processBucketOperate") public void processBucketOperate() { XxlJobHelper.log("process bucket operate job starting..."); JsonResult result = inventoryService.processBucketOperate(); XxlJobHelper.log("process bucket operate job end, result: {}", result); } ... }
(2)定时任务的整体实现流程
⾸先分⻚查询库存分桶操作表中的记录,然后将记录添加到操作队列中。程序启动时会创建⼀个线程池,并开启多个线程,分别去处理操作队列。
在OperateRunner线程中,会不断扫描对应操作队列的数据。只要有数据添加到操作队列中,OperateRunner线程就会去处理操作队列。
当OperateRunner线程获取到操作队列中的分桶操作数据后,⾸先会尝试将该分桶操作的状态设置为处理中。如果设置失败,表示有其他线程已在处理该分桶操作了,不需要继续处理。如果设置成功,则通过策略模式,根据不同的操作类型,执⾏对应的操作。接着设置本次分桶操作的状态为执⾏完成。
当操作队列中的所有分桶操作都执⾏完后,会将其数据状态修改为已完成。然后定时任务会通过while循环间断轮训查出来的所有分桶操作是否已处理。如果是,就会批量修改分桶操作记录的状态,释放锁,结束本次任务。
@Component public class BucketOperateJobHandler { @DubboReference(version = "1.0.0") private InventoryServiceApi inventoryService; @XxlJob("processBucketOperate") public void processBucketOperate() { XxlJobHelper.log("process bucket operate job starting..."); JsonResult result = inventoryService.processBucketOperate(); XxlJobHelper.log("process bucket operate job end, result: {}", result); } ... } //库存服务 @DubboService(version = "1.0.0", interfaceClass = InventoryServiceApi.class, retries = 0) public class InventoryServiceApiImpl implements InventoryServiceApi { @Resource private InventoryBucketService inventoryBucketService; ... //执行分桶操作 @Override public JsonResult processBucketOperate() { try { inventoryBucketService.processBucketOperate(); return JsonResult.buildSuccess(); } catch (ProductBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } } } //库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { @Resource private TairLock tairLock; @Resource private InventoryBucketCache inventoryBucketCache; @Resource private OperateQueue operateQueue; @Resource private InventoryRepository inventoryRepository; ... //执行分桶操作 @Override public JsonResult processBucketOperate() { String code = SnowflakeIdWorker.getCode(); boolean lock = tairLock.lock(TairInventoryConstant.BUCKET_OPERATE_PROCESS, code); if (lock) { try { int page = 1; //分页获取要执行的操作列表 List<InventoryBucketOperateBO> inventoryBucketOperateBOS = inventoryRepository.queryBucketOperateList(BucketOperateStatusEnum.UN_PROCESS.getCode(), page, operateQueueNum); while (!CollectionUtils.isEmpty(inventoryBucketOperateBOS)) { //当前有多少个分桶操作,添加至缓存中 inventoryBucketCache.incrOperateCount(TairInventoryConstant.BUCKET_OPERATE_PROCESS_COUNT, inventoryBucketOperateBOS.size()); //遍历每个分桶操作,添加到操作队列中 for (InventoryBucketOperateBO inventoryBucketOperateBO : inventoryBucketOperateBOS) { operateQueue.offerByRoundRobin(inventoryBucketOperateBO); } //分页获取下一页要执行的分桶操作列表 inventoryBucketOperateBOS = inventoryRepository.queryBucketOperateList(BucketOperateStatusEnum.UN_PROCESS.getCode(), ++page, operateQueueNum); } //等待本次查询出的所有分桶操作执行完成,这里不能按照队列的大小来判断 //因为可能操作队列中还有最后一个分桶操作没执行,刚好取出这个分桶操作后还没执行,此时队列大小也为0 while (inventoryBucketCache.getOperateCount(TairInventoryConstant.BUCKET_OPERATE_PROCESS_COUNT) != 0) { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } //所有分桶操作都执行过之后,获取id List<Long> ids = inventoryBucketCache.getOperateId(TairInventoryConstant.BUCKET_OPERATE_PROCESS); if (!CollectionUtils.isEmpty(ids)) { log.info("修改分桶操作记录的状态为已完成:{}", JSON.toJSONString(ids)); inventoryRepository.bucketOperateSuccess(ids, BucketOperateStatusEnum.FINISH.getCode()); } } finally { inventoryBucketCache.removeOperateId(TairInventoryConstant.BUCKET_OPERATE_PROCESS); tairLock.unlock(TairInventoryConstant.BUCKET_OPERATE_PROCESS, code); } } return JsonResult.buildSuccess(); } ... } @Component public class OperateQueue { //处理分桶操作的操作队列 private final List<BlockingQueue> operateQueue = new ArrayList<>(); //配置的操作队列数量 @Value("${bucket.operate.queue-num:32}") private Integer operateQueueNum; //处理下一个分桶操作的操作队列在队列列表中的下标 private AtomicInteger index = new AtomicInteger(); @Resource private TairCache tairCache; @Resource private InventoryBucketCache inventoryBucketCache; @Resource private BucketOperateStrategyFactory operateFactory; @PostConstruct public void init() { ExecutorService executors = Executors.newFixedThreadPool(operateQueueNum); for (int i = 0; i < operateQueueNum; i++) { //设置操作队列的最大容纳数量 BlockingQueue blockingQueue = new ArrayBlockingQueue(150000); operateQueue.add(blockingQueue); executors.execute(new OperateRunner(blockingQueue, tairCache, inventoryBucketCache, operateFactory)); } } //轮询获取一个操作队列 public boolean offerByRoundRobin(Object object) { index.compareAndSet(operateQueueNum * 10000, 0); boolean offer = operateQueue.get(index.getAndIncrement() % operateQueueNum).offer(object); return offer; } } //多线程消费操作队列中的数据 public class OperateRunner implements Runnable { //处理分桶操作的队列 private BlockingQueue blockingQueue; private TairCache tairCache; private InventoryBucketCache inventoryBucketCache; private BucketOperateStrategyFactory operateFactory; public OperateRunner(BlockingQueue blockingQueue, TairCache tairCache, InventoryBucketCache inventoryBucketCache, BucketOperateStrategyFactory operateFactory) { this.blockingQueue = blockingQueue; this.tairCache = tairCache; this.inventoryBucketCache = inventoryBucketCache; this.operateFactory = operateFactory; } //内部线程处理每个操作队列中的数据 @Override public void run() { while (true) { if (CollectionUtils.isEmpty(blockingQueue)) { try { Thread.sleep(500); continue; } catch (InterruptedException e) { e.printStackTrace(); } } InventoryBucketOperateBO operate = null; try { operate = (InventoryBucketOperateBO) blockingQueue.take(); } catch (InterruptedException e) { log.error("获取分桶任务异常", e); } if (operate == null) { continue; } try { //尝试将分桶操作设置成处理中,成功了则继续处理,失败了表示已经处理了,则直接跳过 if (!tairCache.setNx(TairInventoryConstant.BUCKET_OPERATE_PREFIX + operate.getId(), BucketOperateStatusEnum.PROCESS.getCodeString(), 0)) { //分桶操作不需要执行,减少分桶操作的总数量 inventoryBucketCache.decrOperateCount(TairInventoryConstant.BUCKET_OPERATE_PROCESS_COUNT); continue; } BucketOperateEnum operateEnum = BucketOperateEnum.getByCode(operate.getOperateType()); if (operateEnum != null) { //通过策略模式去处理分桶操作 operateFactory.getStrategy(operateEnum.getService()).process(operate); } //处理完成,则将分桶操作的状态改为成功 tairCache.set(TairInventoryConstant.BUCKET_OPERATE_PREFIX + operate.getId(), BucketOperateStatusEnum.FINISH.getCodeString(), 0); //把成功的分桶操作记录的id放到缓存中 inventoryBucketCache.addOperateId(TairInventoryConstant.BUCKET_OPERATE_PROCESS, operate.getId()); //任务执行后,减少分桶操作的总数量 inventoryBucketCache.decrOperateCount(TairInventoryConstant.BUCKET_OPERATE_PROCESS_COUNT); } catch (Exception e) { //处理分桶操作异常,将分桶操作的处理状态删除,不然后续再处理,setNx设置状态为处理中时会失败 tairCache.delete(TairInventoryConstant.BUCKET_OPERATE_PREFIX + operate.getId()); //分桶操作执行失败,也减少分桶操作的总数量 inventoryBucketCache.decrOperateCount(TairInventoryConstant.BUCKET_OPERATE_PROCESS_COUNT); log.error("处理分桶操作异常", e); } } } }
7.分桶操作之初始化分配库存的处理策略
初始化时,⼀般会上线多个分桶,此时可以通过线程池来处理。通过向线程池提交线程来操作缓存,也就是设置上线分桶的库存数量。
如果缓存操作失败,则需要做回滚操作。即在缓存中记录失败的操作,并把失败的操作写⼊库存操作失败记录表中,等待清除执⾏成功的分桶操作的定时任务来处理回滚操作。
如果有设置分桶库存失败的分桶,则需要从上线分桶列表中删除,防⽌上线失败的分桶被用来扣减库存。
当所有分桶库存设置流程执行完后,就可以将元数据信息写⼊,供库存扣减接⼝使⽤。
//分桶操作路由工厂 @Component public class BucketOperateStrategyFactory { @Autowired private Map<String, AbstractOperateStrategy> operateMap = new ConcurrentHashMap<>(16); //获取分桶操作处理策略的路由 public AbstractOperateStrategy getStrategy(String operate) { return operateMap.get(operate); } } //分桶操作的处理策略抽象类 public abstract class AbstractOperateStrategy { //处理分桶操作 public abstract void process(InventoryBucketOperateBO inventoryBucketOperateBO); } //分桶初始化操作的处理策略 @Service("initOperateProcess") public class InitOperateProcessService extends AbstractOperateStrategy { @Resource private InventoryBucketService inventoryBucketService; @Override public void process(InventoryBucketOperateBO inventoryBucketOperateBO) { inventoryBucketService.processInitBucket(inventoryBucketOperateBO); } } //库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { ... //将初始化的分桶数据写入远程缓存以及本地缓存 @Override public void processInitBucket(InventoryBucketOperateBO inventoryBucketOperateBO) { //从分桶操作记录中解析出分桶元数据信息 BucketLocalCache bucketLocalCache = JSON.parseObject(inventoryBucketOperateBO.getFeature(), BucketLocalCache.class); log.info("执行库存初始化{}", JSONObject.toJSONString(inventoryBucketOperateBO)); //1.获取卖家ID + 商品skuId标识 String key = buildBucketCacheKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); //2.写入数据到对应的缓存上 List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList(); CountDownLatch latch = new CountDownLatch(availableList.size()); List<String> failBucketNos = Collections.synchronizedList(new ArrayList<>()); for (BucketCacheBO bucketCacheBO : availableList) { executors.execute(() -> { log.info("bucketNo:{}, inventoryNum:{}", bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum()); boolean setFlag = tairCache.retrySet(bucketCacheBO.getBucketNo(), JSONObject.toJSONString(bucketCacheBO.getBucketNum()), 0, 3); log.info("set bucket inventory, bucketNo:{}, inventoryNum:{}, successful:{}", bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum(), setFlag); if (!setFlag) { failBucketNos.add(bucketCacheBO.getBucketNo()); //记录失败,后续处理 tairCache.appendJsonExhset(TairInventoryConstant.OPERATE_SET_FAIL, bucketCacheBO.getBucketNo(), inventoryBucketOperateBO.getOperateId(), bucketCacheBO.getBucketNum()); inventoryRepository.saveFailOperate(inventoryBucketOperateBO.getOperateId(), TairInventoryConstant.OPERATE_SET_FAIL, bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum()); } latch.countDown(); }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } if (!CollectionUtils.isEmpty(failBucketNos)) { bucketLocalCache.getUndistributedList().addAll(failBucketNos.stream().map(BucketCacheBO::new).collect(Collectors.toList())); availableList = availableList.stream().filter(bucketCacheBO -> failBucketNos.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList()); bucketLocalCache.setAvailableList(availableList); } //3.记录存储到本地缓存列表 bucketLocalCache.setOperationType(BucketStatusEnum.AVAILABLE_STATUS.getCode()); inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache); log.info("元数据信息:{}", JSONObject.toJSONString(bucketLocalCache)); } ... }
8.分桶操作之增加库存与分桶上线的处理策略
//分桶操作路由工厂 @Component public class BucketOperateStrategyFactory { @Autowired private Map<String, AbstractOperateStrategy> operateMap = new ConcurrentHashMap<>(16); //获取分桶操作处理策略的路由 public AbstractOperateStrategy getStrategy(String operate) { return operateMap.get(operate); } } //分桶操作的处理策略抽象类 public abstract class AbstractOperateStrategy { //处理分桶操作 public abstract void process(InventoryBucketOperateBO inventoryBucketOperateBO); } //分桶上线操作的处理策略 @Service("onlineOperateProcess") public class OnlineOperateProcessService extends AbstractOperateStrategy { @Resource private InventoryBucketService inventoryBucketService; @Override public void process(InventoryBucketOperateBO inventoryBucketOperateBO) { inventoryBucketService.processOnlineOperate(inventoryBucketOperateBO); } } //库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { ... //分桶上线数据写入远程缓存与本地缓存 @Override public void processOnlineOperate(InventoryBucketOperateBO inventoryBucketOperateBO) { log.info("执行分桶上线{}", JSONObject.toJSONString(inventoryBucketOperateBO)); //从分桶操作记录中解析出分桶元数据信息 BucketLocalCache bucketLocalCache = JSON.parseObject(inventoryBucketOperateBO.getFeature(), BucketLocalCache.class); //上线的分桶信息 List<BucketCacheBO> bucketCacheBOList = JSON.parseArray(inventoryBucketOperateBO.getBucket(), BucketCacheBO.class); //本次操作的id String operateId = inventoryBucketOperateBO.getOperateId(); String key = buildBucketCacheKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); //上线失败的列表 List<String> failBucketNos = Collections.synchronizedList(new ArrayList<>()); CountDownLatch latch = new CountDownLatch(bucketCacheBOList.size()); //1.先更新分桶的上线缓存处理操作 for (BucketCacheBO bucketCacheBO : bucketCacheBOList) { executors.execute(() -> { boolean setFlag = tairCache.retrySet(bucketCacheBO.getBucketNo(), JSONObject.toJSONString(bucketCacheBO.getBucketNum()), 0, 3); if (!setFlag) { failBucketNos.add(bucketCacheBO.getBucketNo()); //记录失败,后续处理 inventoryRepository.saveFailOperate(operateId, TairInventoryConstant.OPERATE_INCR_FAIL, bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum()); tairCache.appendJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, bucketCacheBO.getBucketNo(), operateId, bucketCacheBO.getBucketNum()); } latch.countDown(); }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } if (!CollectionUtils.isEmpty(failBucketNos)) { bucketLocalCache.getUndistributedList().addAll(failBucketNos.stream().map(BucketCacheBO::new).collect(Collectors.toList())); bucketLocalCache.setAvailableList(bucketLocalCache.getAvailableList().stream().filter(bucketCacheBO -> failBucketNos.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList())); } //2.处理分桶列表的更新,待中心桶库存以及上线分桶库存更新完成,更新远程和本地的分桶列表 bucketLocalCache.setOperationType(BucketStatusEnum.AVAILABLE_STATUS.getCode()); inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache); } ... }
9.分桶操作之分桶扩容的处理策略
尝试执⾏缓存库存增加操作。如果执⾏成功, 则从失败操作记录中移除,并将失败次数减⼀。如果执⾏失败,不做处理,因为会将这次任务设置为执⾏完成,后续清除执⾏成功的分桶操作的定时任务会处理回滚操作。
//分桶操作路由工厂 @Component public class BucketOperateStrategyFactory { @Autowired private Map<String, AbstractOperateStrategy> operateMap = new ConcurrentHashMap<>(16); //获取分桶操作处理策略的路由 public AbstractOperateStrategy getStrategy(String operate) { return operateMap.get(operate); } } //分桶操作的处理策略抽象类 public abstract class AbstractOperateStrategy { //处理分桶操作 public abstract void process(InventoryBucketOperateBO inventoryBucketOperateBO); } //分桶扩容操作的处理策略 @Service("capacityOperateProcess") public class CapacityOperateProcessService extends AbstractOperateStrategy { @Resource private InventoryBucketService inventoryBucketService; @Override public void process(InventoryBucketOperateBO inventoryBucketOperateBO) { inventoryBucketService.processCapacityOperate(inventoryBucketOperateBO); } } //库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { ... //处理扩容的分桶操作 @Override public void processCapacityOperate(InventoryBucketOperateBO operate) { Long startTime = System.currentTimeMillis(); List<BucketCacheBO> bucketCacheBOList = JSON.parseArray(operate.getBucket(), BucketCacheBO.class); //回源分桶的库存 String bucketNo = bucketCacheBOList.get(0).getBucketNo(); Integer incr = tairCache.retryIncr(bucketNo, operate.getInventoryNum(), 3); if (incr >= 0) { //执行成功,从失败列表中移除,移除数据库 inventoryRepository.deleteBucketOperate(operate.getId()); //移除缓存 tairCache.removeJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, bucketNo, operate.getOperateId()); //回源成功后将失败次数减一 tairCache.decr(TairInventoryConstant.BUCKET_CAPACITY_FAIL + bucketCacheBOList.get(0).getBucketNo(), 1); } log.info("执行扩容操作{},本次耗时{},执行结果{}", JSONObject.toJSONString(operate), System.currentTimeMillis() - startTime, incr >= 0 ? true : false); } ... }
10.分桶操作之分桶下线的处理策略
这⾥只是发送⼀条清空分桶的延迟消息,等待后续延迟处理分桶的下线。将下线分桶中的剩余库存清空,并放回中⼼桶中。
//分桶操作路由工厂 @Component public class BucketOperateStrategyFactory { @Autowired private Map<String, AbstractOperateStrategy> operateMap = new ConcurrentHashMap<>(16); //获取分桶操作处理策略的路由 public AbstractOperateStrategy getStrategy(String operate) { return operateMap.get(operate); } } //分桶操作的处理策略抽象类 public abstract class AbstractOperateStrategy { //处理分桶操作 public abstract void process(InventoryBucketOperateBO inventoryBucketOperateBO); } //分桶下线操作的处理策略 @Service("offlineOperateProcess") public class OfflineOperateProcessService extends AbstractOperateStrategy { @Resource private InventoryBucketService inventoryBucketService; @Override public void process(InventoryBucketOperateBO inventoryBucketOperateBO) { inventoryBucketService.processOfflineOperate(inventoryBucketOperateBO); } } //库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { ... //处理下线的分桶操作 @Override public void processOfflineOperate(InventoryBucketOperateBO inventoryBucketOperateBO) { log.info("执行分桶下线{}", JSONObject.toJSONString(inventoryBucketOperateBO)); log.info("发送分桶清空的消息:{}", inventoryBucketOperateBO.getBucket()); BucketLocalCache bucketLocalCache = JSON.parseObject(inventoryBucketOperateBO.getFeature(), BucketLocalCache.class); List<BucketCacheBO> bucketCacheBOList = JSON.parseArray(inventoryBucketOperateBO.getBucket(), BucketCacheBO.class); List<String> bucketCacheList = bucketCacheBOList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList()); BucketClearRequest bucketClearRequest = new BucketClearRequest(bucketLocalCache.getSkuId(), bucketLocalCache.getSellerId(), bucketCacheList, 0); for (int i = 0; i < 3; i++) { try { //发送清空下线分桶库存的消息,默认这里不存在需要处理的中心桶库存 bucketClearProducer.sendBucketClear(bucketClearRequest); //发送成功,直接返回,发送失败,继续处理 return; } catch (Exception e) { //这里消息发送失败了,再次尝试发送,失败三次的话,就抛出异常 } } //到这里就是消息多次发送失败,需要做其他的处理 throw new ProductBizException(CommonErrorCodeEnum.SEND_MQ_FAILED); } ... } //清空分桶库存的消息队列 @Component public class BucketClearProducer { @Autowired private DefaultProducer defaultProducer; //清空分桶库存的消息 MQ生产 public void sendBucketClear(BucketClearRequest bucketClearRequest) { //发送清空分桶库存消息,延迟1秒,留给更多的时间给正在扣减该分桶的线程处理 //分布式本地缓存的消息覆盖通知有一定延迟性,为避免库存数据的错误必须保证分桶已下线才能准确扣减库存 //实际清空库存的任务,也做了对应重试 //获取当前下线分桶库存和回退如果不一致的下线分桶请求,会重新发延迟消息等待下次消费重试 defaultProducer.sendMessage(RocketMqConstant.BUCKET_CLEAR_TOPIC, JSONObject.toJSONString(bucketClearRequest), 1, "清空分桶"); } } //处理清空分桶库存的消息 @Component public class BucketClearListener implements MessageListenerConcurrently { @Autowired private InventoryBucketService inventoryBucketService; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String msg = new String(messageExt.getBody()); BucketClearRequest bucketClearRequest = JsonUtil.json2Object(msg, BucketClearRequest.class); log.info("执行分桶下线清空库存,消息内容:{}", bucketClearRequest.getBucketNoList()); inventoryBucketService.bucketClear(bucketClearRequest); } } catch (Exception e) { log.error("consume error, 清空分桶库存失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } //库存分桶业务实现类 @Service public class InventoryBucketServiceImpl implements InventoryBucketService { ... //清空分桶库存,分桶库存放回中央库存 @Override public void bucketClear(BucketClearRequest request) { long start = System.currentTimeMillis(); String key = buildBucketCacheKey(request.getSellerId(), request.getSkuId()); String bucketCache = inventoryBucketCache.getBucketCache(key); if (!StringUtils.isEmpty(bucketCache)) { //引用缓存组件需要通过通用对象进行对应的缓存获取 BucketLocalCache bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class); updateBucketInventory(request, bucketLocalCache); } log.info("清空下线分桶库存:{},时间:{}", JSON.toJSONString(request.getBucketNoList()), System.currentTimeMillis() - start); //商品库存值预警 warningProductInventory(bucketCache); } //将分桶的缓存库存返回给中心桶库存上 private void updateBucketInventory(BucketClearRequest request, BucketLocalCache bucketLocalCache) { //中心桶的库存key String key = buildSellerInventoryKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); //中心桶需要回源的库存,默认为0 Integer inventoryNum = request.getInventoryNum(); //准备操作下线的分桶 List<String> bucketCacheList = request.getBucketNoList(); //下线的分桶列表 List<String> undistributedList = bucketLocalCache.getUndistributedList().stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList()); //只处理已经下线的分桶 bucketCacheList = bucketCacheList.stream().filter(undistributedList::contains).collect(Collectors.toList()); //当分桶状态不是已下线的状态,验证这个分桶是否需要处理为下线状态 if (CollectionUtils.isEmpty(bucketCacheList)) { BucketCapacity bucketCapacity = inventoryConverter.converter(request); for (String bucketNo : request.getBucketNoList()) { bucketCapacity.setBucketNo(bucketNo); //再次校验分桶是否触发下线 checkBucketOffline(bucketCapacity); } //不处理分桶库存回退,但是如果有中心桶库存需要重试,这里进行重试 bucketCacheList = new ArrayList<>(); } //标记处理过程中失败的数据,如果是缓存没有库存这种是不会加入 List<String> failureBucketCacheList = new ArrayList<>(); for (String bucketNo : bucketCacheList) { //先获取下线的分桶实际剩余库存 String bucketNum = tairCache.get(bucketNo); //当分桶的库存大于0的时候才处理 if (!StringUtils.isEmpty(bucketNum) && Integer.valueOf(bucketNum) > 0) { //清理下线的分桶库存,设置为0 Integer result = tairCache.decr(bucketNo, Integer.parseInt(bucketNum)); if (result >= 0) { log.info("下线分桶,bucketNo:{},desc:{}", bucketNo, bucketNum); inventoryNum = inventoryNum + Integer.parseInt(bucketNum); } else { log.info("分桶已下线,bucketNo:{}", bucketNo); failureBucketCacheList.add(bucketNo); } } } if (inventoryNum > 0) { //将下线的剩余库存加至 中心桶库存上 Integer incr = tairCache.retryIncr(key, inventoryNum, 3); //当返回的值大于0,则意味本次操作回源中心桶库存成功,标记为0,当存在失败的分桶下线不会累计添加上次任务处理的中心桶库存 log.info("回源中心桶,inventoryNum:{}, after value :{}", inventoryNum, incr); if (incr >= 0) { inventoryNum = 0; } } //对本次库存操作失败的分桶信息,重新写入MQ进行重试 //这里只有回退库存失败的或者中心桶库存没有回源成功的才会再次发送 //如果是已经被扣减掉库存的查询的时候会过滤掉 if (!CollectionUtils.isEmpty(failureBucketCacheList) || inventoryNum > 0) { //发送清空下线分桶库存的消息 bucketClearProducer.sendBucketClear(new BucketClearRequest(bucketLocalCache.getSkuId(), bucketLocalCache.getSellerId(), failureBucketCacheList, inventoryNum)); } } ... }
11.清除执⾏成功的分桶操作的定时任务
(1)定时任务概述
(2)具体处理流程
(1)定时任务概述
这个定时任务主要处理执⾏分桶操作定时任务中处理完成的数据,也就是将库存分桶操作表中状态为已完成的数据查询出来。如果有失败的操作,则回滚该操作,然后删除该任务。
这个定时任务与执⾏分桶操作的定时任务类似,它会分⻚查询执⾏完成的分桶操作,然后放⼊队列中。按照队列处理完成后,整个流程结束,删除表里的分桶操作记录。
(2)具体处理流程
一.处理缓存中有记录的失败的分桶操作
先根据分桶操作记录,处理缓存中有记录的失败的分桶操作。根据记录分桶操作的失败类型,进行对应的回退处理。
如果是初始化分桶库存或分桶上线,则将分配给该分桶的库存放回中⼼桶中,并删除失败的操作记录。
如果是分桶扩容,则将对该分桶进行扩容的库存放回中⼼桶中,并删除当时失败的操作记录,并且将分桶失败的次数减⼀。此时缓存中有记录的、失败的分桶操作就已经处理了。
二.处理缓存中没有但数据库中有的分桶操作记录
当回退任务处理完成之后,删除失败的操作记录,以及分桶操作任务记录。
@Component public class BucketOperateJobHandler { @DubboReference(version = "1.0.0") private InventoryServiceApi inventoryService; ... @XxlJob("bucketOperateFinishedClear") public void bucketOperateFinishedClear() { XxlJobHelper.log("bucket operate finished clear job starting..."); JsonResult result = inventoryService.bucketOperateFinishedClear(); XxlJobHelper.log("bucket operate finished clear job end, result: {}", result); } } @DubboService(version = "1.0.0", interfaceClass = InventoryServiceApi.class, retries = 0) public class InventoryServiceApiImpl implements InventoryServiceApi { @Resource private InventoryBucketService inventoryBucketService; ... @Override public JsonResult bucketOperateFinishedClear() { try { //处理执行成功的 inventoryBucketService.bucketOperateFinishedClear(); return JsonResult.buildSuccess(); } catch (ProductBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } } ... } @Service public class InventoryBucketServiceImpl implements InventoryBucketService { @Resource private OperateClearQueue operateClearQueue; ... //清除执行成功的分桶操作记录 @Override public JsonResult bucketOperateFinishedClear() { String code = SnowflakeIdWorker.getCode(); boolean lock = tairLock.lock(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS, code); if (lock) { try { int page = 1; List<InventoryBucketOperateBO> inventoryBucketOperateBOS = inventoryRepository.queryBucketOperateList(BucketOperateStatusEnum.FINISH.getCode(), page, operateQueueNum); while (!CollectionUtils.isEmpty(inventoryBucketOperateBOS)) { //当前有多少个分桶操作,添加至缓存中 inventoryBucketCache.incrOperateCount(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS_COUNT, inventoryBucketOperateBOS.size()); //将分桶操作轮询添加到操作队列中 for (InventoryBucketOperateBO inventoryBucketOperateBO : inventoryBucketOperateBOS) { operateClearQueue.offerByRoundRobin(inventoryBucketOperateBO); } //分页获取下一页要执行的分桶操作列表 inventoryBucketOperateBOS = inventoryRepository.queryBucketOperateList(BucketOperateStatusEnum.FINISH.getCode(), ++page, operateQueueNum); } //等待本次所有分桶操作都处理完成 while (inventoryBucketCache.getOperateCount(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS_COUNT) != 0) { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } //删除已经执行完成的分桶操作记录 List<Long> ids = inventoryBucketCache.getOperateId(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS); if (!CollectionUtils.isEmpty(ids)) { log.info("清除已完成的任务:{}", JSON.toJSONString(ids)); inventoryRepository.deleteBatchBucketOperate(ids); } } finally { inventoryBucketCache.removeOperateId(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS); tairLock.unlock(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS, code); } } return JsonResult.buildSuccess(); } ... } @Component public class OperateClearQueue { //处理分桶操作的操作队列列表 private final List<BlockingQueue> operateQueue = new ArrayList<>(); //配置的操作队列数量 @Value("${bucket.operate.queue-num:32}") private Integer queueNum; //处理下一个分桶操作的操作队列在队列列表中的下标 private AtomicInteger index = new AtomicInteger(); @Resource private InventoryBucketService inventoryBucketService; @PostConstruct public void init() { ExecutorService executors = Executors.newFixedThreadPool(queueNum); for (int i = 0; i < queueNum; i++) { //设置操作队列的最大容纳数量 BlockingQueue blockingQueue = new ArrayBlockingQueue(150000); operateQueue.add(blockingQueue); executors.execute(new OperateClearRunner(blockingQueue, inventoryBucketService)); } } //轮询获取一个操作队列 public boolean offerByRoundRobin(Object object) { index.compareAndSet(queueNum * 10000, 0); boolean offer = operateQueue.get(index.getAndIncrement() % queueNum).offer(object); return offer; } } //多线程消费操作队列中的数据 public class OperateClearRunner implements Runnable { //处理分桶操作的操作队列 private BlockingQueue blockingQueue; private InventoryBucketService inventoryBucketService; public OperateClearRunner(BlockingQueue blockingQueue, InventoryBucketService inventoryBucketService) { this.blockingQueue = blockingQueue; this.inventoryBucketService = inventoryBucketService; } //内部线程处理每个操作队列的数据 @Override public void run() { while (true) { try { if (CollectionUtils.isEmpty(blockingQueue)) { Thread.sleep(500); continue; } InventoryBucketOperateBO operate = (InventoryBucketOperateBO) blockingQueue.take(); inventoryBucketService.clearBucketOperate(operate); } catch (Exception e) { log.error("处理分桶操作异常", e); } } } } @Service public class InventoryBucketServiceImpl implements InventoryBucketService { ... //清除执行完的分桶操作 @Override public void clearBucketOperate(InventoryBucketOperateBO operate) { Long startTime = System.currentTimeMillis(); List<InventoryOperateFailBO> inventoryOperateFailBOS = inventoryRepository.queryFailOperateList(operate.getOperateId()); Map<String, InventoryOperateFailBO> operateFailBOMap = inventoryOperateFailBOS.stream().collect(Collectors.toMap(InventoryOperateFailBO::getBucketNo, Function.identity())); //先处理缓存中存储的失败记录 processCacheRollback(operate, operateFailBOMap); //再处理缓存中没有存储的但是DB中存储的失败记录 processDbRollback(operate, operateFailBOMap); if (!CollectionUtils.isEmpty(inventoryOperateFailBOS)) { //所有的都执行完成了,删除处理完成的数据库记录,缓存记录在处理过程中已经执行了 inventoryRepository.deleteBatchOperateFail(inventoryOperateFailBOS); } //执行成功的,删除记录 tairCache.delete(TairInventoryConstant.BUCKET_OPERATE_PREFIX + operate.getId()); inventoryBucketCache.addOperateId(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS, operate.getId()); //任务执行后,减少任务数量 inventoryBucketCache.decrOperateCount(TairInventoryConstant.BUCKET_OPERATE_CLEAR_PROCESS_COUNT); log.info("本次分桶清除任务耗时{}, 任务对象{}", System.currentTimeMillis() - startTime, JSONObject.toJSONString(operate)); } //处理缓存中的失败回退 private void processCacheRollback(InventoryBucketOperateBO operate, Map<String, InventoryOperateFailBO> operateFailBOMap) { List<BucketCacheBO> bucketCacheBOS = JSON.parseArray(operate.getBucket(), BucketCacheBO.class); //获取中心桶库存的key String key = buildSellerInventoryKey(operate.getSellerId(), operate.getSkuId()); for (BucketCacheBO bucketCacheBO : bucketCacheBOS) { //分桶的缓存操作有两个,一个是初始化设置库存,一个是回源增加库存 //获取初始化库存失败的操作 Object bucketNum = tairCache.getJsonExhset(TairInventoryConstant.OPERATE_SET_FAIL, bucketCacheBO.getBucketNo(), operate.getOperateId()); if (!Objects.isNull(bucketNum)) { //这里是分桶库存初始化失败了,需要将数据放回中心桶 tairCache.incr(key, (Integer) bucketNum); //处理成功,将失败列表中的数据删除 tairCache.removeJsonExhset(TairInventoryConstant.OPERATE_SET_FAIL, bucketCacheBO.getBucketNo(), operate.getOperateId()); operateFailBOMap.remove(bucketCacheBO.getBucketNo()); } //获取增加库存失败的操作 bucketNum = tairCache.getJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, bucketCacheBO.getBucketNo(), operate.getOperateId()); if (!Objects.isNull(bucketNum)) { //这里是分桶库存增加失败了,需要将数据放回中心桶 tairCache.incr(key, (Integer) bucketNum); //处理成功,将失败列表中的数据删除 tairCache.removeJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, bucketCacheBO.getBucketNo(), operate.getOperateId()); operateFailBOMap.remove(bucketCacheBO.getBucketNo()); //处理完回退之后,如果是扩容的,需要将扩容失败次数减一 if (BucketOperateEnum.CAPACITY.getCode().equals(operate.getOperateType())) { tairCache.decr(TairInventoryConstant.BUCKET_CAPACITY_FAIL + bucketCacheBO.getBucketNo(), 1); } } } } //处理数据库中的失败回退 private void processDbRollback(InventoryBucketOperateBO operate, Map<String, InventoryOperateFailBO> operateFailBOMap) { //获取中心桶库存的key String key = buildSellerInventoryKey(operate.getSellerId(), operate.getSkuId()); //如果缓存中数据操作完,数据库中还有未处理的,需要处理数据库中的 if (!CollectionUtils.isEmpty(operateFailBOMap)) { for (Map.Entry<String, InventoryOperateFailBO> entry : operateFailBOMap.entrySet()) { InventoryOperateFailBO operateFailBO = entry.getValue(); if (TairInventoryConstant.OPERATE_SET_FAIL.equals(operateFailBO.getFailType())) { //这里是分桶库存初始化失败了,需要将数据放回中心桶 tairCache.incr(key, operateFailBO.getInventoryNum()); //处理成功,将失败列表中的数据删除 tairCache.removeJsonExhset(TairInventoryConstant.OPERATE_SET_FAIL, operateFailBO.getBucketNo(), operate.getOperateId()); } else if (TairInventoryConstant.OPERATE_INCR_FAIL.equals(operateFailBO.getFailType())) { //这里是分桶库存增加失败了,需要将数据放回中心桶 tairCache.incr(key, operateFailBO.getInventoryNum()); //处理成功,将失败列表中的数据删除 tairCache.removeJsonExhset(TairInventoryConstant.OPERATE_INCR_FAIL, operateFailBO.getBucketNo(), operate.getOperateId()); //处理完回退之后,如果是扩容的,需要将扩容失败次数减一 if (BucketOperateEnum.CAPACITY.getCode().equals(operate.getOperateType())) { //扩容时,一次操作只有一个分桶数据变更 //所以这里跟缓存中获取的失败记录,最多执行一次,不会导致这次失败的任务,将其他任务的次数减去了 tairCache.decr(TairInventoryConstant.BUCKET_CAPACITY_FAIL + operateFailBO.getBucketNo(), 1); } } } } } ... }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等