订单初版—2.生单链路中的技术问题说明文档
大纲
1.生单链路的业务代码
2.生单链路中可能会出现数据不一致的问题
3.Seata AT模式下的分布式事务的原理
4.Seata AT模式下的分布式事务的读写隔离原理
5.Seata AT模式下的死锁问题以及超时机制
6.Seata AT模式下的读写隔离机制的影响
7.生单链路使用Seata AT模式的具体步骤
8.生单链路使用Seata AT模式时的原理流程
9.生单链路使用Seata AT模式时的并发问题
10.生单链路如何解决库存全局锁争用问题
1.生单链路的业务代码
(1)生成订单流程
(2)入参检查与风控检查
(3)获取商品信息与计算订单价格及验证价格
(4)锁定优惠券与商品库存
(5)新增订单到数据库
(6)发送延迟消息到MQ
(1)生成订单流程
@Service public class OrderServiceImpl implements OrderService { ... //提交订单/生成订单接口 @GlobalTransactional(rollbackFor = Exception.class) @Override public CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) { //1.入参检查 checkCreateOrderRequestParam(createOrderRequest); //2.风控检查 checkRisk(createOrderRequest); //3.获取商品信息 List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest); //4.计算订单价格 CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList); //5.验证订单实付金额 checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO); //6.锁定优惠券 lockUserCoupon(createOrderRequest); //7.锁定商品库存 lockProductStock(createOrderRequest); //8.生成订单到数据库 addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO); //9.发送订单延迟消息用于支付超时自动关单 sendPayOrderTimeoutDelayMessage(createOrderRequest); //返回订单信息 CreateOrderDTO createOrderDTO = new CreateOrderDTO(); createOrderDTO.setOrderId(createOrderRequest.getOrderId()); return createOrderDTO; } ... }
(2)入参检查与风控检查
@Service public class OrderServiceImpl implements OrderService { ... //检查创建订单请求参数 private void checkCreateOrderRequestParam(CreateOrderRequest createOrderRequest) { ParamCheckUtil.checkObjectNonNull(createOrderRequest); //订单ID检查 String orderId = createOrderRequest.getOrderId(); ParamCheckUtil.checkStringNonEmpty(orderId, OrderErrorCodeEnum.ORDER_ID_IS_NULL); //业务线标识检查 Integer businessIdentifier = createOrderRequest.getBusinessIdentifier(); ParamCheckUtil.checkObjectNonNull(businessIdentifier, OrderErrorCodeEnum.BUSINESS_IDENTIFIER_IS_NULL); if (BusinessIdentifierEnum.getByCode(businessIdentifier) == null) { throw new OrderBizException(OrderErrorCodeEnum.BUSINESS_IDENTIFIER_ERROR); } //用户ID检查 String userId = createOrderRequest.getUserId(); ParamCheckUtil.checkStringNonEmpty(userId, OrderErrorCodeEnum.USER_ID_IS_NULL); //订单类型检查 Integer orderType = createOrderRequest.getOrderType(); ParamCheckUtil.checkObjectNonNull(businessIdentifier, OrderErrorCodeEnum.ORDER_TYPE_IS_NULL); if (OrderTypeEnum.getByCode(orderType) == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_TYPE_ERROR); } //卖家ID检查 String sellerId = createOrderRequest.getSellerId(); ParamCheckUtil.checkStringNonEmpty(sellerId, OrderErrorCodeEnum.SELLER_ID_IS_NULL); //配送类型检查 Integer deliveryType = createOrderRequest.getDeliveryType(); ParamCheckUtil.checkObjectNonNull(deliveryType, OrderErrorCodeEnum.USER_ADDRESS_ERROR); if (DeliveryTypeEnum.getByCode(deliveryType) == null) { throw new OrderBizException(OrderErrorCodeEnum.DELIVERY_TYPE_ERROR); } //地址信息检查 String province = createOrderRequest.getProvince(); String city = createOrderRequest.getCity(); String area = createOrderRequest.getArea(); String streetAddress = createOrderRequest.getStreet(); ParamCheckUtil.checkStringNonEmpty(province, OrderErrorCodeEnum.USER_ADDRESS_ERROR); ParamCheckUtil.checkStringNonEmpty(city, OrderErrorCodeEnum.USER_ADDRESS_ERROR); ParamCheckUtil.checkStringNonEmpty(area, OrderErrorCodeEnum.USER_ADDRESS_ERROR); ParamCheckUtil.checkStringNonEmpty(streetAddress, OrderErrorCodeEnum.USER_ADDRESS_ERROR); //区域ID检查 String regionId = createOrderRequest.getRegionId(); ParamCheckUtil.checkStringNonEmpty(regionId, OrderErrorCodeEnum.REGION_ID_IS_NULL); //经纬度检查 BigDecimal lon = createOrderRequest.getLon(); BigDecimal lat = createOrderRequest.getLat(); ParamCheckUtil.checkObjectNonNull(lon, OrderErrorCodeEnum.USER_LOCATION_IS_NULL); ParamCheckUtil.checkObjectNonNull(lat, OrderErrorCodeEnum.USER_LOCATION_IS_NULL); //收货人信息检查 String receiverName = createOrderRequest.getReceiverName(); String receiverPhone = createOrderRequest.getReceiverPhone(); ParamCheckUtil.checkStringNonEmpty(receiverName, OrderErrorCodeEnum.ORDER_RECEIVER_IS_NULL); ParamCheckUtil.checkStringNonEmpty(receiverPhone, OrderErrorCodeEnum.ORDER_RECEIVER_IS_NULL); //客户端设备信息检查 String clientIp = createOrderRequest.getClientIp(); ParamCheckUtil.checkStringNonEmpty(clientIp, OrderErrorCodeEnum.CLIENT_IP_IS_NULL); //商品条目信息检查 List<CreateOrderRequest.OrderItemRequest> orderItemRequestList = createOrderRequest.getOrderItemRequestList(); ParamCheckUtil.checkCollectionNonEmpty(orderItemRequestList, OrderErrorCodeEnum.ORDER_ITEM_IS_NULL); for (CreateOrderRequest.OrderItemRequest orderItemRequest : orderItemRequestList) { Integer productType = orderItemRequest.getProductType(); Integer saleQuantity = orderItemRequest.getSaleQuantity(); String skuCode = orderItemRequest.getSkuCode(); ParamCheckUtil.checkObjectNonNull(productType, OrderErrorCodeEnum.ORDER_ITEM_PARAM_ERROR); ParamCheckUtil.checkObjectNonNull(saleQuantity, OrderErrorCodeEnum.ORDER_ITEM_PARAM_ERROR); ParamCheckUtil.checkStringNonEmpty(skuCode, OrderErrorCodeEnum.ORDER_ITEM_PARAM_ERROR); } //订单费用信息检查 List<CreateOrderRequest.OrderAmountRequest> orderAmountRequestList = createOrderRequest.getOrderAmountRequestList(); ParamCheckUtil.checkCollectionNonEmpty(orderAmountRequestList, OrderErrorCodeEnum.ORDER_AMOUNT_IS_NULL); for (CreateOrderRequest.OrderAmountRequest orderAmountRequest : orderAmountRequestList) { Integer amountType = orderAmountRequest.getAmountType(); ParamCheckUtil.checkObjectNonNull(amountType, OrderErrorCodeEnum.ORDER_AMOUNT_TYPE_IS_NULL); if (AmountTypeEnum.getByCode(amountType) == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_AMOUNT_TYPE_PARAM_ERROR); } } Map<Integer, Integer> orderAmountMap = orderAmountRequestList.stream() .collect(Collectors.toMap(CreateOrderRequest.OrderAmountRequest::getAmountType, CreateOrderRequest.OrderAmountRequest::getAmount)); //订单支付原价不能为空 if (orderAmountMap.get(AmountTypeEnum.ORIGIN_PAY_AMOUNT.getCode()) == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_ORIGIN_PAY_AMOUNT_IS_NULL); } //订单运费不能为空 if (orderAmountMap.get(AmountTypeEnum.SHIPPING_AMOUNT.getCode()) == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_SHIPPING_AMOUNT_IS_NULL); } //订单实付金额不能为空 if (orderAmountMap.get(AmountTypeEnum.REAL_PAY_AMOUNT.getCode()) == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_REAL_PAY_AMOUNT_IS_NULL); } if (StringUtils.isNotEmpty(createOrderRequest.getCouponId())) { //订单优惠券抵扣金额不能为空 if (orderAmountMap.get(AmountTypeEnum.COUPON_DISCOUNT_AMOUNT.getCode()) == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_DISCOUNT_AMOUNT_IS_NULL); } } //订单支付信息检查 List<CreateOrderRequest.PaymentRequest> paymentRequestList = createOrderRequest.getPaymentRequestList(); ParamCheckUtil.checkCollectionNonEmpty(paymentRequestList, OrderErrorCodeEnum.ORDER_PAYMENT_IS_NULL); for (CreateOrderRequest.PaymentRequest paymentRequest : paymentRequestList) { Integer payType = paymentRequest.getPayType(); Integer accountType = paymentRequest.getAccountType(); if (payType == null || PayTypeEnum.getByCode(payType) == null) { throw new OrderBizException(OrderErrorCodeEnum.PAY_TYPE_PARAM_ERROR); } if (accountType == null || AccountTypeEnum.getByCode(accountType) == null) { throw new OrderBizException(OrderErrorCodeEnum.ACCOUNT_TYPE_PARAM_ERROR); } } } //风控检查 private void checkRisk(CreateOrderRequest createOrderRequest) { //调用风控服务进行风控检查 CheckOrderRiskRequest checkOrderRiskRequest = createOrderRequest.clone(CheckOrderRiskRequest.class); JsonResult<CheckOrderRiskDTO> jsonResult = riskApi.checkOrderRisk(checkOrderRiskRequest); if (!jsonResult.getSuccess()) { throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); } } ... }
(3)获取商品信息与计算订单价格及验证价格
@Service public class OrderServiceImpl implements OrderService { ... //获取订单条目商品信息 private List<ProductSkuDTO> listProductSkus(CreateOrderRequest createOrderRequest) { List<CreateOrderRequest.OrderItemRequest> orderItemRequestList = createOrderRequest.getOrderItemRequestList(); List<ProductSkuDTO> productSkuList = new ArrayList<>(); for (CreateOrderRequest.OrderItemRequest orderItemRequest : orderItemRequestList) { String skuCode = orderItemRequest.getSkuCode(); ProductSkuQuery productSkuQuery = new ProductSkuQuery(); productSkuQuery.setSkuCode(skuCode); productSkuQuery.setSellerId(createOrderRequest.getSellerId()); JsonResult<ProductSkuDTO> jsonResult = productApi.getProductSku(productSkuQuery); if (!jsonResult.getSuccess()) { throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); } ProductSkuDTO productSkuDTO = jsonResult.getData(); //sku不存在 if (productSkuDTO == null) { throw new OrderBizException(OrderErrorCodeEnum.PRODUCT_SKU_CODE_ERROR, skuCode); } productSkuList.add(productSkuDTO); } return productSkuList; } //计算订单价格,如果使用了优惠券、红包、积分等,会一并进行扣减 //@param createOrderRequest 订单信息 //@param productSkuList 商品信息 private CalculateOrderAmountDTO calculateOrderAmount(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList) { CalculateOrderAmountRequest calculateOrderPriceRequest = createOrderRequest.clone(CalculateOrderAmountRequest.class, CloneDirection.FORWARD); //订单条目补充商品信息 Map<String, ProductSkuDTO> productSkuDTOMap = productSkuList.stream().collect(Collectors.toMap(ProductSkuDTO::getSkuCode, Function.identity())); calculateOrderPriceRequest.getOrderItemRequestList().forEach(item -> { String skuCode = item.getSkuCode(); ProductSkuDTO productSkuDTO = productSkuDTOMap.get(skuCode); item.setProductId(productSkuDTO.getProductId()); item.setSalePrice(productSkuDTO.getSalePrice()); }); //调用营销服务计算订单价格 JsonResult<CalculateOrderAmountDTO> jsonResult = marketApi.calculateOrderAmount(calculateOrderPriceRequest); //检查价格计算结果 if (!jsonResult.getSuccess()) { throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); } CalculateOrderAmountDTO calculateOrderAmountDTO = jsonResult.getData(); if (calculateOrderAmountDTO == null) { throw new OrderBizException(OrderErrorCodeEnum.CALCULATE_ORDER_AMOUNT_ERROR); } //订单费用信息 List<OrderAmountDTO> orderAmountList = ObjectUtil.convertList(calculateOrderAmountDTO.getOrderAmountList(), OrderAmountDTO.class); if (orderAmountList == null || orderAmountList.isEmpty()) { throw new OrderBizException(OrderErrorCodeEnum.CALCULATE_ORDER_AMOUNT_ERROR); } //订单条目费用明细 List<OrderAmountDetailDTO> orderItemAmountList = ObjectUtil.convertList(calculateOrderAmountDTO.getOrderAmountDetail(), OrderAmountDetailDTO.class); if (orderItemAmountList == null || orderItemAmountList.isEmpty()) { throw new OrderBizException(OrderErrorCodeEnum.CALCULATE_ORDER_AMOUNT_ERROR); } return calculateOrderAmountDTO; } //验证订单实付金额 private void checkRealPayAmount(CreateOrderRequest createOrderRequest, CalculateOrderAmountDTO calculateOrderAmountDTO) { List<CreateOrderRequest.OrderAmountRequest> originOrderAmountRequestList = createOrderRequest.getOrderAmountRequestList(); Map<Integer, CreateOrderRequest.OrderAmountRequest> originOrderAmountMap = originOrderAmountRequestList.stream().collect(Collectors.toMap(CreateOrderRequest.OrderAmountRequest::getAmountType, Function.identity())); //前端给的实付金额 Integer originRealPayAmount = originOrderAmountMap.get(AmountTypeEnum.REAL_PAY_AMOUNT.getCode()).getAmount(); List<CalculateOrderAmountDTO.OrderAmountDTO> orderAmountDTOList = calculateOrderAmountDTO.getOrderAmountList(); Map<Integer, CalculateOrderAmountDTO.OrderAmountDTO> orderAmountMap = orderAmountDTOList.stream().collect(Collectors.toMap(CalculateOrderAmountDTO.OrderAmountDTO::getAmountType, Function.identity())); //营销计算出来的实付金额 Integer realPayAmount = orderAmountMap.get(AmountTypeEnum.REAL_PAY_AMOUNT.getCode()).getAmount(); if (!originRealPayAmount.equals(realPayAmount)) { //订单验价失败 throw new OrderBizException(OrderErrorCodeEnum.ORDER_CHECK_REAL_PAY_AMOUNT_FAIL); } } ... }
(4)锁定优惠券与商品库存
@Service public class OrderServiceImpl implements OrderService { ... //锁定用户优惠券 private void lockUserCoupon(CreateOrderRequest createOrderRequest) { String couponId = createOrderRequest.getCouponId(); if (StringUtils.isEmpty(couponId)) { return; } LockUserCouponRequest lockUserCouponRequest = createOrderRequest.clone(LockUserCouponRequest.class); //调用营销服务锁定用户优惠券 JsonResult<Boolean> jsonResult = marketApi.lockUserCoupon(lockUserCouponRequest); //检查锁定用户优惠券结果 if (!jsonResult.getSuccess()) { throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); } } //锁定商品库存 private void lockProductStock(CreateOrderRequest createOrderRequest) { String orderId = createOrderRequest.getOrderId(); List<LockProductStockRequest.OrderItemRequest> orderItemRequestList = ObjectUtil.convertList( createOrderRequest.getOrderItemRequestList(), LockProductStockRequest.OrderItemRequest.class); LockProductStockRequest lockProductStockRequest = new LockProductStockRequest(); lockProductStockRequest.setOrderId(orderId); lockProductStockRequest.setOrderItemRequestList(orderItemRequestList); JsonResult<Boolean> jsonResult = inventoryApi.lockProductStock(lockProductStockRequest); //检查锁定商品库存结果 if (!jsonResult.getSuccess()) { throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); } } ... } @DubboService(version = "1.0.0", interfaceClass = MarketApi.class, retries = 0) public class MarketApiImpl implements MarketApi { ... //锁定用户优惠券记录 @Override public JsonResult<Boolean> lockUserCoupon(LockUserCouponRequest lockUserCouponRequest) { try { Boolean result = couponService.lockUserCoupon(lockUserCouponRequest); return JsonResult.buildSuccess(result); } catch (MarketBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } } ... } @Service public class CouponServiceImpl implements CouponService { ... //锁定用户优惠券 @Transactional(rollbackFor = Exception.class) @Override public Boolean lockUserCoupon(LockUserCouponRequest lockUserCouponRequest) { //检查入参 checkLockUserCouponRequest(lockUserCouponRequest); String userId = lockUserCouponRequest.getUserId(); String couponId = lockUserCouponRequest.getCouponId(); CouponDO couponDO = couponDAO.getUserCoupon(userId, couponId); if (couponDO == null) { throw new MarketBizException(MarketErrorCodeEnum.USER_COUPON_IS_NULL); } //判断优惠券是否已经使用了 if (CouponUsedStatusEnum.USED.getCode().equals(couponDO.getUsed())) { throw new MarketBizException(MarketErrorCodeEnum.USER_COUPON_IS_USED); } couponDO.setUsed(CouponUsedStatusEnum.USED.getCode()); couponDO.setUsedTime(new Date()); couponDAO.updateById(couponDO); return true; } ... } @DubboService(version = "1.0.0", interfaceClass = InventoryApi.class, retries = 0) public class InventoryApiImpl implements InventoryApi { @Autowired private InventoryService inventoryService; //锁定商品库存 @Override public JsonResult<Boolean> lockProductStock(LockProductStockRequest lockProductStockRequest) { try { Boolean result = inventoryService.lockProductStock(lockProductStockRequest); return JsonResult.buildSuccess(result); } catch (InventoryBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } } ... } @Service public class InventoryServiceImpl implements InventoryService { ... //锁定商品库存 @Transactional(rollbackFor = Exception.class) @Override public Boolean lockProductStock(LockProductStockRequest lockProductStockRequest) { //检查入参 checkLockProductStockRequest(lockProductStockRequest); List<LockProductStockRequest.OrderItemRequest> orderItemRequestList = lockProductStockRequest.getOrderItemRequestList(); for (LockProductStockRequest.OrderItemRequest orderItemRequest : orderItemRequestList) { String skuCode = orderItemRequest.getSkuCode(); ProductStockDO productStockDO = productStockDAO.getBySkuCode(skuCode); if (productStockDO == null) { throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_ERROR); } Integer saleQuantity = orderItemRequest.getSaleQuantity(); //执行库存扣减,并需要解决防止超卖的问题 int nums = productStockDAO.lockProductStock(skuCode, saleQuantity); if (nums <= 0) { throw new InventoryBizException(InventoryErrorCodeEnum.LOCK_PRODUCT_SKU_STOCK_ERROR); } } return true; } ... }
(5)新增订单到数据库
@Service public class OrderServiceImpl implements OrderService { ... //新增订单数据到数据库 private void addNewOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) { //封装新订单数据 NewOrderDataHolder newOrderDataHolder = new NewOrderDataHolder(); //生成主订单 FullOrderData fullMasterOrderData = addNewMasterOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO); //封装主订单数据到NewOrderData对象中 newOrderDataHolder.appendOrderData(fullMasterOrderData); //如果存在多种商品类型,需要按商品类型进行拆单 Map<Integer, List<ProductSkuDTO>> productTypeMap = productSkuList.stream().collect(Collectors.groupingBy(ProductSkuDTO::getProductType)); if (productTypeMap.keySet().size() > 1) { for (Integer productType : productTypeMap.keySet()) { //生成子订单 FullOrderData fullSubOrderData = addNewSubOrder(fullMasterOrderData, productType); //封装子订单数据到NewOrderData对象中 newOrderDataHolder.appendOrderData(fullSubOrderData); } } //保存订单到数据库 //订单信息 List<OrderInfoDO> orderInfoDOList = newOrderDataHolder.getOrderInfoDOList(); if (!orderInfoDOList.isEmpty()) { orderInfoDAO.saveBatch(orderInfoDOList); } //订单条目 List<OrderItemDO> orderItemDOList = newOrderDataHolder.getOrderItemDOList(); if (!orderItemDOList.isEmpty()) { orderItemDAO.saveBatch(orderItemDOList); } //订单配送信息 List<OrderDeliveryDetailDO> orderDeliveryDetailDOList = newOrderDataHolder.getOrderDeliveryDetailDOList(); if (!orderDeliveryDetailDOList.isEmpty()) { orderDeliveryDetailDAO.saveBatch(orderDeliveryDetailDOList); } //订单支付信息 List<OrderPaymentDetailDO> orderPaymentDetailDOList = newOrderDataHolder.getOrderPaymentDetailDOList(); if (!orderPaymentDetailDOList.isEmpty()) { orderPaymentDetailDAO.saveBatch(orderPaymentDetailDOList); } //订单费用信息 List<OrderAmountDO> orderAmountDOList = newOrderDataHolder.getOrderAmountDOList(); if (!orderAmountDOList.isEmpty()) { orderAmountDAO.saveBatch(orderAmountDOList); } //订单费用明细 List<OrderAmountDetailDO> orderAmountDetailDOList = newOrderDataHolder.getOrderAmountDetailDOList(); if (!orderAmountDetailDOList.isEmpty()) { orderAmountDetailDAO.saveBatch(orderAmountDetailDOList); } //订单状态变更日志信息 List<OrderOperateLogDO> orderOperateLogDOList = newOrderDataHolder.getOrderOperateLogDOList(); if (!orderOperateLogDOList.isEmpty()) { orderOperateLogDAO.saveBatch(orderOperateLogDOList); } //订单快照数据 List<OrderSnapshotDO> orderSnapshotDOList = newOrderDataHolder.getOrderSnapshotDOList(); if (!orderSnapshotDOList.isEmpty()) { orderSnapshotDAO.saveBatch(orderSnapshotDOList); } } //新增主订单信息订单 private FullOrderData addNewMasterOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) { NewOrderBuilder newOrderBuilder = new NewOrderBuilder(createOrderRequest, productSkuList, calculateOrderAmountDTO, orderProperties); FullOrderData fullOrderData = newOrderBuilder.buildOrder() .buildOrderItems() .buildOrderDeliveryDetail() .buildOrderPaymentDetail() .buildOrderAmount() .buildOrderAmountDetail() .buildOperateLog() .buildOrderSnapshot() .build(); //订单信息 OrderInfoDO orderInfoDO = fullOrderData.getOrderInfoDO(); //订单条目信息 List<OrderItemDO> orderItemDOList = fullOrderData.getOrderItemDOList(); //订单费用信息 List<OrderAmountDO> orderAmountDOList = fullOrderData.getOrderAmountDOList(); //补全地址信息 OrderDeliveryDetailDO orderDeliveryDetailDO = fullOrderData.getOrderDeliveryDetailDO(); String detailAddress = getDetailAddress(orderDeliveryDetailDO); orderDeliveryDetailDO.setDetailAddress(detailAddress); //补全订单状态变更日志 OrderOperateLogDO orderOperateLogDO = fullOrderData.getOrderOperateLogDO(); String remark = "创建订单操作0-10"; orderOperateLogDO.setRemark(remark); //补全订单商品快照信息 List<OrderSnapshotDO> orderSnapshotDOList = fullOrderData.getOrderSnapshotDOList(); for (OrderSnapshotDO orderSnapshotDO : orderSnapshotDOList) { //优惠券信息 if (orderSnapshotDO.getSnapshotType().equals(SnapshotTypeEnum.ORDER_COUPON.getCode())) { ... } //订单费用信息 else if (orderSnapshotDO.getSnapshotType().equals(SnapshotTypeEnum.ORDER_AMOUNT.getCode())) { orderSnapshotDO.setSnapshotJson(JsonUtil.object2Json(orderAmountDOList)); } //订单条目信息 else if (orderSnapshotDO.getSnapshotType().equals(SnapshotTypeEnum.ORDER_ITEM.getCode())) { orderSnapshotDO.setSnapshotJson(JsonUtil.object2Json(orderItemDOList)); } } return fullOrderData; } ... }
(6)发送延迟消息到MQ
@Service public class OrderServiceImpl implements OrderService { ... //发送支付订单超时延迟消息,用于支付超时自动关单 private void sendPayOrderTimeoutDelayMessage(CreateOrderRequest createOrderRequest) { PayOrderTimeoutDelayMessage message = new PayOrderTimeoutDelayMessage(); message.setOrderId(createOrderRequest.getOrderId()); message.setBusinessIdentifier(createOrderRequest.getBusinessIdentifier()); message.setCancelType(OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode()); message.setUserId(createOrderRequest.getUserId()); message.setOrderType(createOrderRequest.getOrderType()); message.setOrderStatus(OrderStatusEnum.CREATED.getCode()); String msgJson = JsonUtil.object2Json(message); defaultProducer.sendMessage( RocketMqConstant.PAY_ORDER_TIMEOUT_DELAY_TOPIC, msgJson, RocketDelayedLevel.DELAYED_30m, "支付订单超时延迟消息" ); } ... } @Component public class DefaultProducer { private final DefaultMQProducer producer; @Autowired public DefaultProducer(RocketMQProperties rocketMQProperties) { producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP); producer.setNamesrvAddr(rocketMQProperties.getNameServer()); start(); } //对象在使用之前必须要调用一次,只能初始化一次 public void start() { try { this.producer.start(); } catch (MQClientException e) { log.error("producer start error", e); } } ... //发送消息 public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) { Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8)); try { if (delayTimeLevel > 0) { msg.setDelayTimeLevel(delayTimeLevel); } SendResult send = producer.send(msg); if (SendStatus.SEND_OK == send.getSendStatus()) { log.info("发送MQ消息成功, type:{}, message:{}", type, message); } else { throw new OrderBizException(send.getSendStatus().toString()); } } catch (Exception e) { log.error("发送MQ消息失败:", e); throw new OrderBizException(OrderErrorCodeEnum.SEND_MQ_FAILED); } } ... }
2.生单链路中可能会出现数据不一致的问题
在更新优惠券本地事务、更新库存本地事务、插入订单数据本地事务中,可能会出现优惠券和库存都已经更新成功了,但订单数据却插入失败,此时就会出现数据不一致的问题。
3.Seata AT模式下的分布式事务的原理
说明一:需要部署一个Seata Server服务器。
说明二:在各个服务的分支事务的数据库中,需要新增一张undo_log表,用来记录各个服务的分支事务失败时可以执行的回滚SQL。
说明三:当入口服务开启一个分布式事务时,需要向Seata Server开启一个全局事务。
说明四:各个服务对其分支事务的执行情况会同步给Seata Server服务器。
说明五:当Seata Server发现某分支事务失败时,便会通知各服务进行事务回滚。
说明六:当各个服务进行事务回滚时,会从undo_log表查出对应SQL去执行。
4.Seata AT模式下的分布式事务的读写隔离原理
为了避免有其他线程修改某数据后又进行回滚,就一定要加本地锁 + 全局锁。本地锁是为了避免当前机器的其他线程对数据进行修改并回滚,全局锁是为了避免分布式机器的线程对数据进行修改并回滚。
服务A在更新某数据之前,需要先获取本地锁。服务A在成功获取本地锁之后,需要插入undo log数据。接着,服务A需要向Seata Server服务器获取全局锁。服务A在成功获取全局锁之后,会提交本地事务并释放本地锁。
如果服务A对服务B进行RPC调用并提交其本地事务,则继续按前面的步骤处理服务B的数据更新。当服务B的本地事务也提交完成后,不需要继续执行其他分支事务了,服务A便可以提交分布式事务,并释放全局锁。
分布式事务的全链路在执行完毕前,对应数据的全局锁是不会释放的。
5.Seata AT模式下的死锁问题以及超时机制
Seata的一条事务链路里,每一个事务都会按如下顺序执行:首先获取本地锁更新本地数据,然后插入undo log记录,接着获取本地数据对应的全局锁,最后提交本地事务并释放本地锁。
Seata的一条事务链路里,一个事务执行完就会继续执行下一个事务。如果事务链路里的所有事务都执行完成了,那么就提交事务,并释放全局锁。如果某个事务需要回滚,那么就需要获取该事务本地数据的本地锁,然后获取undo log记录生成逆向操作的SQL语句来进行补偿和更新,补偿完毕才能释放本地数据的全局锁。
由于Seata AT模式的写隔离是通过本地数据的全局锁来实现的,所以写隔离的过程中,就涉及到了本地数据的本地锁和全局锁两把锁,这时候就很容易导致出现死锁的情况。
比如当事务1的分支事务提交数据1的本地事务后,会释放数据1的本地锁。此时事务2的分支事务就可以获取数据1的本地锁,但要等待获取事务1释放数据1的全局锁后,才能释放数据1的本地锁。如果事务1的后续分支事务出现异常需要进行回滚,那么事务1就需要获取数据1的本地锁,执行回滚补偿处理。事务1执行完分支事务的回滚补偿处理后,才能释放数据1的全局锁。
于是就出现了这样的死锁场景:事务1对数据1的回滚,占用了数据1的全局锁,需等待获取数据1的本地锁。事务2对数据1的更新,占用了数据1的本地锁,需等待获取数据1的全局锁。
Seata为了解决这个问题,会引入等待全局锁的超时机制。如果事务2在等待数据1的全局锁时出现超时,就会释放其占用的本地锁。从而让事务1能获取到数据1的本地锁,完成其事务操作,而不用一直等待。
6.Seata AT模式下的读写隔离机制的影响
由于全局锁的存在,会严重影响Seata AT分布式事务的并发吞吐量。所以除非是金融级别的系统,才会使用像Seata AT模式这么严格的事务来保证数据的强一致性。
当然,通常情况下分布式事务基本都是更新不同的数据。只要更新不同的数据,那么Seata AT分布式事务也不会出现全局锁等待。只有一些特殊情况下,才可能会出现大量分布式事务更新同一条数据。当使用Seata AT分布式事务时,特别注意尽量不要让全局锁等待。
如果不使用全局锁,那么Seata AT模式的分布式事务就会出现写未提交。就可能出现分支事务更新失败时无法回滚,因为回滚的数据已被覆盖。
Seata AT模式的分布式事务默认是读未提交的,即分布式事务在未提交前,分支事务更新的数据是可被其他事务读到的。
此外,很多公司都是使用基于RocketMQ的柔性事务来实现分布式事务。
7.生单链路使用Seata AT模式的具体步骤
(1)生单链路中分布式事务的主要分支事务
(2)订单系统 + 优惠券系统 + 商品库存系统都需要在pom.xml文件中引入Seata
(3)订单系统的生单接口作为分布式事务入口需添加@GlobalTransactional注解开启全局事务
优惠券系统的锁定优惠券接口需添加Spring的事务注解@Transactional
商品库存系统的锁定库存接口需添加Spring的事务注解@Transactional
各分支事务操作的数据库需要添加undo log表
(1)生单链路中分布式事务的主要分支事务
分布式事务入口:订单系统的生单接口
分支事务1:优惠券系统锁定优惠券
分支事务2:商品库存系统锁定商品库存
分支事务3:订单系统创建订单数据
(2)订单系统 + 优惠券系统 + 商品库存系统都需要在pom.xml文件中引入Seata
<!-- 引入seata整合分布式事务 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <exclusions> <exclusion> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> </exclusion> </exclusions> </dependency> <!-- 跟安装的seata-server需要保持版本一致 --> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.3.0</version> </dependency>
(3)订单系统的生单接口作为分布式事务入口需添加@GlobalTransactional注解开启全局事务
通过添加Seata提供的注解@GlobalTransactional来开启全局事务。
@Service public class OrderServiceImpl implements OrderService { ... //提交订单/生成订单接口 //@param createOrderRequest 提交订单请求入参 //@return 订单号 @GlobalTransactional(rollbackFor = Exception.class) @Override public CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) { //1.入参检查 checkCreateOrderRequestParam(createOrderRequest); //2.风控检查 checkRisk(createOrderRequest); //3.获取商品信息 List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest); //4.计算订单价格 CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList); //5.验证订单实付金额 checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO); //6.锁定优惠券 lockUserCoupon(createOrderRequest); //7.锁定商品库存 lockProductStock(createOrderRequest); //8.生成订单到数据库 addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO); //9.发送订单延迟消息用于支付超时自动关单 sendPayOrderTimeoutDelayMessage(createOrderRequest); //返回订单信息 CreateOrderDTO createOrderDTO = new CreateOrderDTO(); createOrderDTO.setOrderId(createOrderRequest.getOrderId()); return createOrderDTO; } ... }
(4)优惠券系统的锁定优惠券接口需添加Spring的事务注解@Transactional
通过添加Spring提供的@Transactional注解来开启本地事务。Seata会代理Spring的事务,进行本地锁申请 + undo log写入 + 全局锁请求 + 提交/回滚本地事务等操作。
@Service public class CouponServiceImpl implements CouponService { ... //锁定用户优惠券 @Transactional(rollbackFor = Exception.class) @Override public Boolean lockUserCoupon(LockUserCouponRequest lockUserCouponRequest) { //检查入参 checkLockUserCouponRequest(lockUserCouponRequest); String userId = lockUserCouponRequest.getUserId(); String couponId = lockUserCouponRequest.getCouponId(); CouponDO couponDO = couponDAO.getUserCoupon(userId, couponId); if (couponDO == null) { throw new MarketBizException(MarketErrorCodeEnum.USER_COUPON_IS_NULL); } //判断优惠券是否已经使用了 if (CouponUsedStatusEnum.USED.getCode().equals(couponDO.getUsed())) { throw new MarketBizException(MarketErrorCodeEnum.USER_COUPON_IS_USED); } couponDO.setUsed(CouponUsedStatusEnum.USED.getCode()); couponDO.setUsedTime(new Date()); couponDAO.updateById(couponDO); return true; } ... }
(5)商品库存系统的锁定库存接口需添加Spring的事务注解@Transactional
通过添加Spring提供的@Transactional注解来开启本地事务。Seata会代理Spring的事务,进行本地锁申请 + undo log写入 + 全局锁请求 + 提交/回滚本地事务等操作。
@Service public class InventoryServiceImpl implements InventoryService { ... //锁定商品库存 @Transactional(rollbackFor = Exception.class) @Override public Boolean lockProductStock(LockProductStockRequest lockProductStockRequest) { //检查入参 checkLockProductStockRequest(lockProductStockRequest); List<LockProductStockRequest.OrderItemRequest> orderItemRequestList = lockProductStockRequest.getOrderItemRequestList(); for (LockProductStockRequest.OrderItemRequest orderItemRequest : orderItemRequestList) { String skuCode = orderItemRequest.getSkuCode(); ProductStockDO productStockDO = productStockDAO.getBySkuCode(skuCode); if (productStockDO == null) { throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_ERROR); } Integer saleQuantity = orderItemRequest.getSaleQuantity(); //执行库存扣减,并需要解决防止超卖的问题 int nums = productStockDAO.lockProductStock(skuCode, saleQuantity); if (nums <= 0) { throw new InventoryBizException(InventoryErrorCodeEnum.LOCK_PRODUCT_SKU_STOCK_ERROR); } } return true; } ... }
(6)各分支事务操作的数据库需要添加undo log表
订单系统的数据库 + 优惠券系统的数据库 + 库存系统的数据库,都需要添加如下一张undo_log表,提供给Seata使用。
CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
8.生单链路使用Seata AT模式时的原理流程
(1)undo log的生成
(2)生单链路使用Seata AT模式时的原理流程
(1)undo log的生成
首先根据更新字段查询出前镜像Before Image,然后进行本地事务的更新,接着根据更新字段查询出更新后的后镜像After Image,这样就可以根据前后镜像 + 执行SQL语句的基本信息拼成一条undo log。
分布式事务入口会向Seata Server注册一个全局事务XID,分支事务会向Seata Server注册一个分支事务Branch ID。
如下是Seata官网提供的一条undo log数据示例:
{ "branchId": 641789253, "undoItems": [{ "afterImage": { "rows": [{ "fields": [{ "name": "id", "type": 4, "value": 1 }, { "name": "name", "type": 12, "value": "GTS" }, { "name": "since", "type": 12, "value": "2014" }] }], "tableName": "product" }, "beforeImage": { "rows": [{ "fields": [{ "name": "id", "type": 4, "value": 1 }, { "name": "name", "type": 12, "value": "TXC" }, { "name": "since", "type": 12, "value": "2014" }] }], "tableName": "product" }, "sqlType": "UPDATE" }], "xid": "xid:xxx" }
(2)生单链路使用Seata AT模式时的原理流程
9.生单链路使用Seata AT模式时的并发问题
(1)在锁定优惠券环节不存在并发获取全局锁问题
(2)在锁定库存环节存在并发获取全局锁问题
(3)在生成订单环节不存在并发获取全局锁问题
生单链路中的分布式事务环节在于:锁定优惠券 + 锁定库存 + 生成订单。
(1)在锁定优惠券环节不存在并发获取全局锁问题
每个用户都会有属于自己的优惠券。日常情况下,都是不同的用户使用不同的优惠券购买商品,所以并不会出现并发获取同一条优惠券数据的全局锁的情况。
(2)在锁定库存环节存在并发获取全局锁问题
对于爆品或秒杀,大量用户可能都会基于某商品进行下单扣减库存,因此会出现并发获取同一个SKU数据的全局锁。
第一个获取到某SKU数据的全局锁的事务,在进行生成订单环节由于需要插入多条SQL,所以可能会比较耗时,从而导致并发等待获取该SKU数据的全局锁的其他事务等待时间过长。
(3)在生成订单环节不存在并发获取全局锁问题
生成订单环节,涉及到多条SQL的插入操作,也存在耗时的风险。
10.生单链路如何解决库存全局锁争用问题
(1)锁定库存时的全局锁争用问题分析
(2)库存分桶方案+柔性事务方案+Seata事务方案
(1)锁定库存时的全局锁争用问题分析
一个商品SKU就对应一条库存数据记录,如果大量用户同时购买一个商品SKU,必然导致多个分布式事务都去竞争和等待同一个SKU库存数据的全局锁。
(2)库存分桶方案+柔性事务方案+Seata事务方案
一.库存分桶方案
一般一个SKU就一条库存数据,在库存分桶方案下,一个SKU会有多条库存数据。比如1万的库存可分为1000条库存数据,每条库存数据可扣库存为10。每次扣减库存时,按照一定的规则和算法,选择一个库存分桶去进行扣减。
二.RocketMQ柔性事务方案
通过RocketMQ柔性事务方案来替换Seata刚性事务方案。在互联网公司里,一般的业务系统,都使用RocketMQ柔性事务。大多情况下,RocketMQ柔性事务都能确保数据是一致的。
刚性事务指的是分支事务出现异常或者失败,则全局回滚。柔性事务指的是分支事务出现异常或者失败,则不断重试直到成功。
使用RocketMQ柔性事务方案,需要确保消息成功被投递到RocketMQ。
三.使用没有全局锁的分布式事务方案,比如TCC
Seata支持AT、TCC、Saga、XA这几种事务方案。对于生单链路的建议是使用混合的分布式事务方案:锁定营销使用AT模式 + 锁定库存使用TCC模式。
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等