秒杀系统—2.第一版初步实现的技术文档
大纲
1.在秒杀运营服务中开发配置秒杀活动接口
2.创建独立的通用工程封装Jedis客户端管理组件
3.对秒杀场次和秒杀商品进行DB + 缓存双写
4.新增秒杀商品时发送消息到RocketMQ
5.消费秒杀商品新增消息时渲染秒杀页面
6.对秒杀商品的库存进行初始化
7.基于Lua开发OpenResty中内嵌的限流Lua脚本
8.实现秒杀抢购流程的核心代码
9.实现秒杀系统高可用架构的伪代码
1.在秒杀运营服务中开发配置秒杀活动接口
seckill-operation模块主要有两个接口:添加秒杀场次 + 添加秒杀商品。
//简化版的秒杀运营服务的接口,所有接口都是GET方式 @RestController @RequestMapping("/seckill/operation") public class SeckillOperationController { //秒杀场次Service组件 @Autowired private SeckillSessionService seckillSessionService; //秒杀商品Service组件 @Autowired private SeckillProductService seckillProductService; //增加秒杀场次的接口 @GetMapping("/session/add") public String addSeckillSession(SeckillSession seckillSession) { seckillSessionService.add(seckillSession); return "success"; } //增加秒杀场次下商品的接口 @GetMapping("/product/add") public String addSeckillProduct(SeckillProduct seckillProduct) { seckillProductService.add(seckillProduct); return "success"; } } //秒杀场次 @Data public class SeckillSession { //秒杀场次id private Long id; //秒杀场次日期 private String sessionDate; //秒杀场次时间 private String sessionTime; } //秒杀商品 @Data public class SeckillProduct { //秒杀商品id private Long id; //秒杀场次id private Long sessionId; //商品id private Long productId; //秒杀价格 private Double seckillPrice; //秒杀库存数量 private Long seckillStock; } //秒杀场次Service组件 @Service public class SeckillSessionServiceImpl implements SeckillSessionService { //秒杀场次DAO组件 @Autowired private SeckillSessionDAO seckillSessionDAO; //增加秒杀场次 public void add(SeckillSession seckillSession) { seckillSessionDAO.add(seckillSession); } } //秒杀商品Service组件 @Service public class SeckillProductServiceImpl implements SeckillProductService { //秒杀商品DAO组件 @Autowired private SeckillProductDAO seckillProductDAO; //增加秒杀商品 public void add(SeckillProduct seckillProduct) { seckillProductDAO.add(seckillProduct); } } //秒杀场次DAO组件 @Repository public class SeckillSessionDAOImpl implements SeckillSessionDAO { //秒杀场次Mapper组件 @Autowired private SeckillSessionMapper seckillSessionMapper; //增加秒杀场次 public void add(SeckillSession seckillSession) { seckillSessionMapper.insert(seckillSession); } } //秒杀商品DAO组件 @Repository public class SeckillProductDAOImpl implements SeckillProductDAO { //秒杀商品Mapper组件 @Autowired private SeckillProductMapper seckillProductMapper; //增加秒杀商品 public void add(SeckillProduct seckillProduct) { seckillProductMapper.insert(seckillProduct); } } //秒杀场次Mapper组件 @Mapper public interface SeckillSessionMapper { //插入秒杀场次 @Insert("INSERT INTO seckill_session(session_date,session_time) " + "VALUES(#{sessionDate},#{sessionTime})") @Options(useGeneratedKeys = true, keyColumn = "id", keyProperty = "id") void insert(SeckillSession seckillSession); } //秒杀商品Mapper组件 @Mapper public interface SeckillProductMapper { //插入秒杀商品 @Insert("INSERT INTO seckill_product(session_id,product_id,seckill_price,seckill_stock) " + "VALUES(#{sessionId},#{productId},#{seckillPrice},#{seckillStock})") @Options(useGeneratedKeys = true, keyColumn = "id", keyProperty = "id") void insert(SeckillProduct seckillProduct); }
2.创建独立的通用工程封装Jedis客户端管理组件
为了避免Redis采用主从复制架构时主节点宕机可能导致的超卖问题,假设部署的Redis节点都是Master,不做主从复制,多主支持库存分片。
seckill-common模块封装了一个管理Jedis客户端的组件:
//管理单个Jedis实例的组件 public class JedisManager { //一个Redis实例在这里就是一个Jedis实例 private ConcurrentHashMap<String, Jedis> jedisMap = new ConcurrentHashMap<String, Jedis>(); //管理组件自己本身是单例 private JedisManager() { } static class Singleton { static JedisManager instance = new JedisManager(); } public static JedisManager getInstance() { return Singleton.instance; } //获取Jedis实例,同时进行缓存 public Jedis getJedis(String host, Integer port) { String cacheKey = host + port; if (jedisMap.get(cacheKey) == null) { synchronized(this) { if (jedisMap.get(cacheKey) == null) { Jedis jedis = new Jedis(host, port); jedisMap.put(cacheKey, jedis); } } } return jedisMap.get(cacheKey); } //获取默认的jedis实例 public Jedis getJedis() { return getJedis("127.0.0.1", 6379); } }
3.对秒杀场次和秒杀商品进行DB + 缓存双写
先写MySQL,再写Redis。
//秒杀场次Service组件 @Service public class SeckillSessionServiceImpl implements SeckillSessionService { //秒杀场次DAO组件 @Autowired private SeckillSessionDAO seckillSessionDAO; //增加秒杀场次 public void add(SeckillSession seckillSession) { //先写MySQL seckillSessionDAO.add(seckillSession); //再写Redis JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); jedis.lpush("seckill::sessions::" + seckillSession.getSessionDate(), JSONObject.toJSONString(seckillSession)); } } //秒杀商品Service组件 @Service public class SeckillProductServiceImpl implements SeckillProductService { //秒杀商品DAO组件 @Autowired private SeckillProductDAO seckillProductDAO; //增加秒杀商品 public void add(SeckillProduct seckillProduct) { //先写MySQL seckillProductDAO.add(seckillProduct); //再写Redis JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); jedis.lpush("seckill::products::" + seckillProduct.getSessionId(), JSONObject.toJSONString(seckillProduct)); } }
4.新增秒杀商品时发送消息到RocketMQ
需要在seckill-common模块下封装一个RocketMQ生产者和消费者的单例。其实秒杀系统中,基本都是靠MQ来进行解耦,很少会直接发起调用请求。
//封装一个RocketMQ生产者单例类 public class RocketMQProducer { private DefaultMQProducer producer; private static String producerGroup; private RocketMQProducer(String producerGroup) { try { this.producer = new DefaultMQProducer(producerGroup); this.producer.setNamesrvAddr("localhost:9876"); this.producer.setSendMsgTimeout(60 * 1000); this.producer.start(); } catch(MQClientException e) { System.err.println("初始化RocketMQ生产者失败:" + e); } } private static class Singleton { static RocketMQProducer instance = new RocketMQProducer(producerGroup); } //设置生产者分组名称 public static void setProducerGroup(String producerGroup) { RocketMQProducer.producerGroup = producerGroup; } //获取单例 public static RocketMQProducer getInstance() { return Singleton.instance; } //获取MQ生产者 public DefaultMQProducer getProducer() { return producer; } } //封装一个RocketMQ消费者单例类 public class RocketMQConsumer { private static String consumerGroup; private static String topic; private static MessageListenerConcurrently listener; private RocketMQConsumer(String consumerGroup, String topic, MessageListenerConcurrently listener) { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe(topic, "*"); consumer.registerMessageListener(listener); consumer.start(); System.out.println("RocketMQ消费者启动成功......"); } catch(MQClientException e) { System.err.println("初始化RocketMQ消费者失败......" + e); } } private static class Singleton { static RocketMQConsumer instance = new RocketMQConsumer(consumerGroup, topic, listener); } public RocketMQConsumer getInstance() { return Singleton.instance; } public static void setConsumerGroup(String consumerGroup) { RocketMQConsumer.consumerGroup = consumerGroup; } public static void setTopic(String topic) { RocketMQConsumer.topic = topic; } public static void setListener(MessageListenerConcurrently listener) { RocketMQConsumer.listener = listener; } public static RocketMQConsumer init() { return Singleton.instance; } }
然后在seckill-operation模块下新增秒杀商品时发送消息到RocketMQ,这个新增秒杀商品的消息会被seckill-page模块消费来渲染秒杀商品页面。
//秒杀运营服务的接口 @RestController @RequestMapping("/seckill/operation") public class SeckillOperationController { //秒杀场次Service组件 @Autowired private SeckillSessionService seckillSessionService; //秒杀商品Service组件 @Autowired private SeckillProductService seckillProductService; //增加秒杀场次的接口 @GetMapping("/session/add") public String addSeckillSession(SeckillSession seckillSession) { seckillSessionService.add(seckillSession); return "success"; } //增加秒杀场次下商品的接口 @GetMapping("/product/add") public String addSeckillProduct(SeckillProduct seckillProduct) { seckillProductService.add(seckillProduct); DefaultMQProducer producer = RocketMQProducer.getInstance().getProducer(); try { Message message = new Message("seckill_product_added_topic", null, JSONObject.toJSONString(seckillProduct).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.printf("%s%n", sendResult); System.out.println("增加秒杀商品时,发送消息到MQ成功"); } catch(Exception e) { System.err.println("增加秒杀商品时,发送消息到MQ失败" + e); return "failure"; } return "success"; } }
5.消费秒杀商品新增消息时渲染秒杀页面
在seckill-page模块会消费新增秒杀商品消息,然后使用freemarker渲染秒杀商品页面。
@Component public class BootListener implements CommandLineRunner { public void run(String... strings) throws Exception { RocketMQConsumer.setConsumerGroup("seckill-page-consumer-group"); RocketMQConsumer.setTopic("seckill-product-added-topic"); RocketMQConsumer.setListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : list) { String seckillProductJSON = new String(messageExt.getBody()); JSONObject seckillProductJSONObject = JSONObject.parseObject(seckillProductJSON); Long productId = seckillProductJSONObject.getLong("productId"); Double seckillPrice = seckillProductJSONObject.getDouble("seckillPrice"); Long seckillStock = seckillProductJSONObject.getLong("seckillStock"); FreemarkerHelper viewEngine = new FreemarkerHelper(); Map<String, Object> paras = new HashMap<String, Object>(); paras.put("productId", productId); paras.put("seckillPrice", seckillPrice); paras.put("seckillStock", seckillStock); String html = viewEngine.parseTemplate("autolist.ftl", paras); System.out.println("将渲染完毕的秒杀商品html页面写入磁盘文件......"); System.out.println(html); System.out.println("将磁盘上的html文件使用scp命令传送到nginx服务器上去......"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); RocketMQConsumer.init(); } } //freemarker辅助组件 public class FreemarkerHelper { private static Configuration _tplConfig = new Configuration(); static { try { _tplConfig.setDirectoryForTemplateLoading(new File("xxx/xxx/xxx")); } catch (IOException e) { e.printStackTrace(); } } //解析freemarker模板 public String parseTemplate(String tplName, String encoding, Map<String, Object> paras) { try { StringWriter swriter = new StringWriter(); Template mytpl = null; mytpl = _tplConfig.getTemplate(tplName, encoding); mytpl.process(paras, swriter); return swriter.toString(); } catch (Exception e) { e.printStackTrace(); return e.toString(); } } public String parseTemplate(String tplName, Map<String, Object> paras) { return this.parseTemplate(tplName, "utf-8", paras); } }
6.对秒杀商品的库存进行初始化
在seckill-inventory模块中消费新增秒杀商品消息来冻结库存,然后把秒杀商品的库存进行分片,放在Redis各节点上去。
@Component public class BootListener implements CommandLineRunner { public void run(String... strings) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("seckill-inventory-consumer-group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("seckill_product_added_topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : messageExts) { System.out.println(new String(messageExt.getBody())); //获取秒杀场次里增加的是哪个商品 String seckillProductJSON = new String(messageExt.getBody()); JSONObject seckillProductJSONObject = JSONObject.parseObject(seckillProductJSON); //调用库存中心提供的接口,冻结商品用于秒杀活动的库存 Long productId = seckillProductJSONObject.getLong("productId"); Double seckillPrice = seckillProductJSONObject.getDouble("seckillPrice"); Long seckillStock = seckillProductJSONObject.getLong("seckillStock"); System.out.println("调用库存中心提供的接口,冻结商品用于秒杀活动的库存"); //库存中心:可售库存、锁定库存、已售库存、冻结库存 //把秒杀商品的库存进行分片,放在Redis集群各个节点上 RedisCluster redisCluster = RedisCluster.getInstance(); redisCluster.initSeckillProductStock(productId, seckillStock); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("消费者启动......"); } }
在seckill-common模块中实现库存分片的组件:
//基于Redis集群进行数据分片的工具类 public class RedisCluster { //Redis集群对应的Jedis列表 private List<Jedis> cluster = new ArrayList<Jedis>(); //私有构造函数 private RedisCluster() { //初始化Redis集群 cluster.add(new Jedis("127.0.0.1", 6479)); cluster.add(new Jedis("127.0.0.1", 6579)); cluster.add(new Jedis("127.0.0.1", 6679)); } //单例 private static class Singleton { static RedisCluster instance = new RedisCluster(); } //获取单例 public static RedisCluster getInstance() { return Singleton.instance; } //初始化秒杀商品库存 public void initSeckillProductStock(Long productId, Long seckillStock) { //计算每个Redis节点上的库存分片数量 int clusterSize = cluster.size(); Long seckillStockPerNode = seckillStock / clusterSize; Long remainSeckillStock = seckillStock - seckillStockPerNode * clusterSize; Long seckillStockLastNode = seckillStockPerNode + remainSeckillStock; System.out.println("每个Redis节点(除最后一个节点外)的库存数量为:" + seckillStockPerNode); System.out.println("最后一个Redis节点的库存数量为:" + seckillStockLastNode); System.out.println("Redis集群的总库存数量为:" + (seckillStockPerNode * (clusterSize - 1) + seckillStockLastNode)); //对除最后一个redis节点之外的其他节点,进行库存分片数据初始化 for (int i = 0; i < cluster.size() - 1; i++) { Jedis jedis = cluster.get(i); JSONObject jsonObject = new JSONObject(); jsonObject.put("saling_stock", seckillStockPerNode); jsonObject.put("locked_stock", 0); jsonObject.put("saled_stock", 0); jedis.set("seckill::product::" + productId + "::stock", jsonObject.toJSONString()); } //对Redis集群最后一个节点,进行库存分片数据初始化 Jedis jedis = cluster.get(cluster.size() - 1); JSONObject jsonObject = new JSONObject(); jsonObject.put("saling_stock", seckillStockLastNode); jsonObject.put("locked_stock", 0); jsonObject.put("saled_stock", 0); jedis.set("seckill::product::" + productId + "::stock", jsonObject.toJSONString()); } }
7.基于Lua开发OpenResty中内嵌的限流Lua脚本
-- 限流可以分为:全局限流 + 业务限流 -- 全局限流,这里假设写死为10000,每秒最多可以放10000流量进入秒杀系统 -- 当然全局限流数也是可以是动态可配置的,动态配置时可以配置在Redis中 globalLimiting = 10000 -- 业务限流,需要先获取Redis里当前的秒杀场次,再获取场次里每个商品的限购数量 -- 比如对该秒杀场次下的商品的最大抢购请求数 = 其限购数量 * 1.1 currentSessionId = 101 currentSessionProductLimiting = {} currentSessionProductLimiting[518] = 1000 currentSessionProductLimiting[629] = 10000 currentSessionProductLimiting[745] = 200 -- 此时在OpenResty里,过来了一个请求 -- 首先进行全局限流,每秒最多可以放行1w个请求 -- 下面定义一个变量currentRequests,存放当前这一秒放放行的请求数量 currentTime = nil currentRequests = 0 currentProductRequests = {} -- 获取当前时间戳 timestamp = os.date("%Y-%m-%d %H:%M:%S") if (currentTime == nil) then currentTime = timestamp end -- 判断currentTime是否属于当前这一秒,如果是则对当前这一秒的数量进行累加 if (timestamp == currentTime) then if (currentRequests <= globalLimiting) then -- OpenResty支持提取HTTP请求的请求参数、请求信息等 -- 下面假设提取本次抢购请求的秒杀商品id为518 local productId = 518 local productRequests = currentProductRequests[productId] if (productRequests == nil or productRequests == 0) then -- 放行HTTP请求 currentProductRequests[productId] = 1 currentRequests = currentRequests + 1 else local productLimiting = currentSessionProductLimiting[productId] -- 商品的最大抢购请求数 = 其限购数量 * 1.1 if (productRequests <= productLimiting * 1.1) then -- 放行HTTP请求 currentProductRequests[productId] = productRequests + 1 currentRequests = currentRequests + 1 else -- 秒杀商品的放行请求数已超过其限购数量的1.1倍 -- 此时进行业务限流,返回响应给客户端,通知用户抢购失败 end end else -- 这一秒内的请求数量超过了10000,进行全局限流 -- 利用OpenResty返回一个预定义的响应给客户端,通知用户抢购失败 end else -- 新的一秒重置currentTime和currentRequests,并放行当前请求 currentTime = timestamp currentRequests = 1 end
8.实现秒杀抢购流程的核心代码
(1)使用Hash数据结构来重构秒杀库存数据
(2)实现处理秒杀抢购请求的HTTP接口
(3)库存分片组件RedisCluster实现秒杀抢购逻辑
(4)消费秒杀抢购成功的消息生成抢购订单
(5)消费订单创建成功的消息更新秒杀商品库存
(1)使用Hash数据结构来重构秒杀库存数据
修改seckill-common模块中的RedisCluster:
//基于Redis集群进行数据分片的工具类 public class RedisCluster { //Redis集群对应的Jedis列表 private List<Jedis> cluster = new ArrayList<Jedis>(); //私有构造函数 private RedisCluster() { //初始化Redis集群 cluster.add(new Jedis("127.0.0.1", 6479)); cluster.add(new Jedis("127.0.0.1", 6579)); cluster.add(new Jedis("127.0.0.1", 6679)); } //单例 private static class Singleton { static RedisCluster instance = new RedisCluster(); } //获取单例 public static RedisCluster getInstance() { return Singleton.instance; } //初始化秒杀商品库存 public void initSeckillProductStock(Long productId, Long seckillStock) { //计算每个Redis节点上的库存分片数量 int clusterSize = cluster.size(); Long seckillStockPerNode = seckillStock / clusterSize; Long remainSeckillStock = seckillStock - seckillStockPerNode * clusterSize; Long seckillStockLastNode = seckillStockPerNode + remainSeckillStock; System.out.println("每个Redis节点(除最后一个节点外)的库存数量为:" + seckillStockPerNode); System.out.println("最后一个Redis节点的库存数量为:" + seckillStockLastNode); System.out.println("Redis集群的总库存数量为:" + (seckillStockPerNode * (clusterSize - 1) + seckillStockLastNode)); //对除最后一个redis节点之外的其他节点,进行库存分片数据初始化 for (int i = 0; i < cluster.size() - 1; i++) { Jedis jedis = cluster.get(i); Map<String, String> dataMap = new HashMap<String, String>(); dataMap.put("salableStock", String.valueOf(seckillStockPerNode)); dataMap.put("lockedStock", "0"); dataMap.put("soldStock", "0"); jedis.hset("seckill::product::" + productId + "::stock", dataMap); } //对Redis集群最后一个节点,进行库存分片数据初始化 Jedis jedis = cluster.get(cluster.size() - 1); Map<String, String> dataMap = new HashMap<String, String>(); dataMap.put("salableStock", String.valueOf(seckillStockLastNode)); dataMap.put("lockedStock", "0"); dataMap.put("soldStock", "0"); jedis.hset("seckill::product::" + productId + "::stock", dataMap); } ... }
(2)实现处理秒杀抢购请求的HTTP接口
在seckill-flash-sale模块中添加:
//秒杀抢购请求处理接口 @RestController @RequestMapping("/seckill/flash/sale") public class FlashSaleController { //用户对商品进行抢购,默认限定每个商品最多只能抢购一件 @GetMapping("/") public String flashSale(Long userId, Long productId) { //秒杀抢购代码的核心是Lua脚本,需要实现当库存分片为0时,自动进行库存分片节点迁移 RedisCluster redisCluster = RedisCluster.getInstance(); Boolean flashSaleResult = redisCluster.flashSale(userId, productId); //如果秒杀抢购成功了,则发送消息到MQ进行异步下单 if (flashSaleResult) { DefaultMQProducer producer = RocketMQProducer.getInstance().getProducer(); try { JSONObject flashSaleSuccessInform = new JSONObject(); flashSaleSuccessInform.put("userId", userId); flashSaleSuccessInform.put("productId", productId); Message message = new Message("flash_sale_success_inform", null, flashSaleSuccessInform.toJSONString().getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.printf("%s%n", sendResult); System.out.println("发送秒杀抢购成功的消息到MQ成功......"); } catch(Exception e) { System.err.println("发送秒杀抢购成功的消息到MQ失败:" + e); return "秒杀抢购成功,但是推送消息到MQ失败"; } return "秒杀抢购成功"; } else { return "秒杀抢购失败"; } } }
(3)库存分片组件RedisCluster实现秒杀抢购逻辑
修改seckill-common模块中的RedisCluster:
//基于Redis集群进行数据分片的工具类 public class RedisCluster { //Redis集群对应的Jedis列表 private List<Jedis> cluster = new ArrayList<Jedis>(); //私有构造函数 private RedisCluster() { //初始化redis集群 cluster.add(new Jedis("127.0.0.1", 6479)); cluster.add(new Jedis("127.0.0.1", 6579)); cluster.add(new Jedis("127.0.0.1", 6679)); } //单例 private static class Singleton { static RedisCluster instance = new RedisCluster(); } //获取单例 public static RedisCluster getInstance() { return Singleton.instance; } //初始化秒杀商品库存 public void initSeckillProductStock(Long productId, Long seckillStock) { //计算每个Redis节点上的库存分片数量 int clusterSize = cluster.size(); Long seckillStockPerNode = seckillStock / clusterSize; Long remainSeckillStock = seckillStock - seckillStockPerNode * clusterSize; Long seckillStockLastNode = seckillStockPerNode + remainSeckillStock; System.out.println("每个Redis节点(除最后一个节点外)的库存数量为:" + seckillStockPerNode); System.out.println("最后一个Redis节点的库存数量为:" + seckillStockLastNode); System.out.println("Redis集群的总库存数量为:" + (seckillStockPerNode * (clusterSize - 1) + seckillStockLastNode)); //对除最后一个Redis节点之外的其他节点,进行库存分片数据初始化 for (int i = 0; i < cluster.size() - 1; i++) { Jedis jedis = cluster.get(i); Map<String, String> dataMap = new HashMap<String, String>(); dataMap.put("salableStock", String.valueOf(seckillStockPerNode)); dataMap.put("lockedStock", "0"); dataMap.put("soldStock", "0"); jedis.hset("seckill::product::" + productId + "::stock", dataMap); } //对Redis集群最后一个节点,进行库存分片数据初始化 Jedis jedis = cluster.get(cluster.size() - 1); Map<String, String> dataMap = new HashMap<String, String>(); dataMap.put("salableStock", String.valueOf(seckillStockLastNode)); dataMap.put("lockedStock", "0"); dataMap.put("soldStock", "0"); jedis.hset("seckill::product::" + productId + "::stock", dataMap); } //基于Redis集群进行秒杀抢购 public Boolean flashSale(Long userId, Long productId) { //随机选择一个Redis的节点 int redisNodeCount = cluster.size(); int chosenRedisNodeIndex = new Random().nextInt(redisNodeCount); Jedis chosenRedisNode = cluster.get(chosenRedisNodeIndex); //向Redis节点提交一个Lua脚本进行抢购 String flashSaleLuaScript = "" + "local productKey = 'seckill::product::" + productId + "::stock';" + "local salableStock = redis.call('hget', productKey, 'salableStock');" + "local lockedStock = redis.call('hget', productKey, 'lockedStock');" + "if (salableStock > 0) " + "then " + " redis.call('hset', productKey, 'salableStock', salableStock - 1);" + " redis.call('hset', productKey, 'lockedStock', lockedStock + 1);" + " return 'success';" + "else " + " return 'fail';" + "end;"; //通过Jedis的eval()方法执行Lua脚本 String flashSaleResult = (String) chosenRedisNode.eval(flashSaleLuaScript); //如果秒杀抢购成功了 if ("success".equals(flashSaleResult)) { JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); //记录用户秒杀成功的商品库存属于哪个分片 jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(chosenRedisNodeIndex)); return true; } //如果第一次秒杀抢购失败,则进行库存分片迁移的操作 //当然可以继续优化为,库存分片为0的节点后续不会再被选中 else { Boolean flashSaleSuccess = false; for (int i = 0; i < cluster.size(); i++) { if (i != chosenRedisNodeIndex) { Jedis redisNode = cluster.get(i); flashSaleResult = (String) redisNode.eval(flashSaleLuaScript); if ("success".equals(flashSaleResult)) { JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(i)); flashSaleSuccess = true; break; } } } return flashSaleSuccess; } } ... }
(4)消费秒杀抢购成功的消息生成抢购订单
订单中心创建完订单会发送订单创建成功的消息到MQ,在seckill-order模块中添加如下代码:
@Component public class BootListener implements CommandLineRunner { public static final Long ORDER_RATE_LIMIT = 500L; //系统启动会自动执行run()方法 public void run(String... strings) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("seckill-order-consumer-group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("flash_sale_success_inform", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : messageExts) { System.out.println(new String(messageExt.getBody())); //抢购成功的通知 JSONObject flashSaleSuccessInform = JSONObject.parseObject(new String(messageExt.getBody())); //调用订单中心提供的接口进行秒杀抢购的下单,订单中心创建完订单会发送订单创建成功的消息到MQ Long userId = flashSaleSuccessInform.getLong("userId"); Long productId = flashSaleSuccessInform.getLong("productId"); //TODO: 在这里需要对下单进行简单限流(固定窗口算法) JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); Boolean orderResult = false; while (!orderResult) { Date now = new Date(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String currentSecond = dateFormat.format(now); Long result = jedis.incr("flash_sale::order::rate_limit::" + currentSecond); if (result < ORDER_RATE_LIMIT) { System.out.println("调用调用订单中心提供的接口进行秒杀抢购的下单,用户id: " + userId + ", 商品id: " + productId); orderResult = true; } else { //如果当前这一秒限流了,此时休眠一秒,下一秒继续进行下单即可 try { Thread.sleep(1000); } catch(InterruptedException e) { e.printStackTrace(); } } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("消费者启动......"); } }
(5)消费订单创建成功的消息更新秒杀商品库存
在seckill-inventory模块中添加如下代码:
@Component public class BootListener implements CommandLineRunner { public void run(String... strings) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("seckill-inventory-consumer-group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("seckill_product_added_topic", "*"); consumer.subscribe("order_pay_result_inform", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : messageExts) { System.out.println(new String(messageExt.getBody())); String topic = messageExt.getTopic(); if (topic.equals("seckill_product_added_topic")) { //获取秒杀场次里增加的是哪个商品 String seckillProductJSON = new String(messageExt.getBody()); JSONObject seckillProductJSONObject = JSONObject.parseObject(seckillProductJSON); //调用库存中心提供的接口,冻结商品用于秒杀活动的库存 Long productId = seckillProductJSONObject.getLong("productId"); Double seckillPrice = seckillProductJSONObject.getDouble("seckillPrice"); Long seckillStock = seckillProductJSONObject.getLong("seckillStock"); JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); String inventoryInitedFlag = jedis.get("seckill::product-inventory-inited::flag::" + productId); if (inventoryInitedFlag != null && inventoryInitedFlag.equals("inited")) { continue; } jedis.set("seckill::product-inventory-inited::flag::" + productId, "inited"); System.out.println("调用库存中心提供的接口,冻结商品用于秒杀活动的库存"); //库存中心:可售库存、锁定库存、已售库存、冻结库存 //对秒杀商品的库存进行分片,存放在Redis各节点上 RedisCluster redisCluster = RedisCluster.getInstance(); redisCluster.initSeckillProductStock(productId,seckillStock); } else if (topic.equals("order_pay_result_inform")) { //解析订单支付结果的通知 JSONObject orderPayResult = JSONObject.parseObject(new String(messageExt.getBody())); Long userId = orderPayResult.getLong("userId"); Long productId = orderPayResult.getLong("productId"); Boolean orderPaySuccess = orderPayResult.getInteger("orderPaySuccess") == 1 ? true : false; //幂等性保障 JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); String orderPayResultProcessedFlag = jedis.get("seckill::order-pay-result-processed::flag::" + userId + "::" + productId); if (orderPayResultProcessedFlag != null && orderPayResultProcessedFlag.equals("processed")) { continue; } jedis.set("seckill::order-pay-result-processed::flag::" + userId + "::" + productId, "processed"); //获取当时秒杀成功时的库存分片所在Redis节点 String stockShardRedisNode = jedis.get("flash_sale::stock_shard::" + userId + "::" + productId); RedisCluster redisCluster = RedisCluster.getInstance(); if (orderPaySuccess) {//如果秒杀订单支付成功 redisCluster.flashSaleOrderPaySuccess(stockShardRedisNode, productId); } else {//如果秒杀订单支付失败或取消 redisCluster.flashSaleOrderPayFail(stockShardRedisNode, productId); } System.out.println("秒杀抢购商品的订单支付结果处理成功......"); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("消费者启动......"); } }
在RedisCluster中增加订单支付成功和支付失败时的库存处理逻辑:
//基于Redis集群进行数据分片的工具类 public class RedisCluster { //Redis集群对应的Jedis列表 private List<Jedis> cluster = new ArrayList<Jedis>(); //私有构造函数 private RedisCluster() { //初始化redis集群 cluster.add(new Jedis("127.0.0.1", 6479)); cluster.add(new Jedis("127.0.0.1", 6579)); cluster.add(new Jedis("127.0.0.1", 6679)); } //单例 private static class Singleton { static RedisCluster instance = new RedisCluster(); } //获取单例 public static RedisCluster getInstance() { return Singleton.instance; } //初始化秒杀商品库存 public void initSeckillProductStock(Long productId, Long seckillStock) { //计算每个Redis节点上的库存分片数量 int clusterSize = cluster.size(); Long seckillStockPerNode = seckillStock / clusterSize; Long remainSeckillStock = seckillStock - seckillStockPerNode * clusterSize; Long seckillStockLastNode = seckillStockPerNode + remainSeckillStock; System.out.println("每个Redis节点(除最后一个节点外)的库存数量为:" + seckillStockPerNode); System.out.println("最后一个Redis节点的库存数量为:" + seckillStockLastNode); System.out.println("Redis集群的总库存数量为:" + (seckillStockPerNode * (clusterSize - 1) + seckillStockLastNode)); //对除最后一个Redis节点之外的其他节点,进行库存分片数据初始化 for (int i = 0; i < cluster.size() - 1; i++) { Jedis jedis = cluster.get(i); Map<String, String> dataMap = new HashMap<String, String>(); dataMap.put("salableStock", String.valueOf(seckillStockPerNode)); dataMap.put("lockedStock", "0"); dataMap.put("soldStock", "0"); jedis.hset("seckill::product::" + productId + "::stock", dataMap); } //对Redis集群最后一个节点,进行库存分片数据初始化 Jedis jedis = cluster.get(cluster.size() - 1); Map<String, String> dataMap = new HashMap<String, String>(); dataMap.put("salableStock", String.valueOf(seckillStockLastNode)); dataMap.put("lockedStock", "0"); dataMap.put("soldStock", "0"); jedis.hset("seckill::product::" + productId + "::stock", dataMap); } //基于Redis集群进行秒杀抢购 public Boolean flashSale(Long userId, Long productId) { //随机选择一个Redis的节点 int redisNodeCount = cluster.size(); int chosenRedisNodeIndex = new Random().nextInt(redisNodeCount); Jedis chosenRedisNode = cluster.get(chosenRedisNodeIndex); //向Redis节点提交一个Lua脚本进行抢购 String flashSaleLuaScript = "" + "local productKey = 'seckill::product::" + productId + "::stock';" + "local salableStock = redis.call('hget', productKey, 'salableStock');" + "local lockedStock = redis.call('hget', productKey, 'lockedStock');" + "if (salableStock > 0) " + "then " + " redis.call('hset', productKey, 'salableStock', salableStock - 1);" + " redis.call('hset', productKey, 'lockedStock', lockedStock + 1);" + " return 'success';" + "else " + " return 'fail';" + "end;"; //通过Jedis的eval()方法执行Lua脚本 String flashSaleResult = (String) chosenRedisNode.eval(flashSaleLuaScript); //如果秒杀抢购成功了 if ("success".equals(flashSaleResult)) { JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); //记录用户秒杀成功的商品库存属于哪个分片 jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(chosenRedisNodeIndex)); return true; } //如果第一次秒杀抢购失败,则进行库存分片迁移的操作 //当然可以继续优化为,库存分片为0的节点后续不会再被选中 else { Boolean flashSaleSuccess = false; for (int i = 0; i < cluster.size(); i++) { if (i != chosenRedisNodeIndex) { Jedis redisNode = cluster.get(i); flashSaleResult = (String) redisNode.eval(flashSaleLuaScript); if ("success".equals(flashSaleResult)) { JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(i)); flashSaleSuccess = true; break; } } } return flashSaleSuccess; } } //秒杀订单支付成功的库存处理逻辑 public void flashSaleOrderPaySuccess(String stockShardRedisNode, Long productId) { Integer redisNodeIndex = Integer.valueOf(stockShardRedisNode); Jedis redisNode = cluster.get(redisNodeIndex); String luaScript = "" + "local productKey = 'seckill::product::" + productId + "::stock';" + "local lockedStock = redis.call('hget', productKey, 'lockedStock');" + "local soldStock = redis.call('hget', productKey, 'soldStock');" + "redis.call('hset', productKey, 'lockedStock', lockedStock - 1);" + "redis.call('hset', productKey, 'soldStock', lockedStock + 1);"; redisNode.eval(luaScript); } //秒杀订单支付失败的库存处理逻辑 public void flashSaleOrderPayFail(String stockShardRedisNode, Long productId) { Integer redisNodeIndex = Integer.valueOf(stockShardRedisNode); Jedis redisNode = cluster.get(redisNodeIndex); String luaScript = "" + "local productKey = 'seckill::product::" + productId + "::stock';" + "local salableStock = redis.call('hget', productKey, 'salableStock');" + "local lockedStock = redis.call('hget', productKey, 'lockedStock');" + "redis.call('hset', productKey, 'lockedStock', lockedStock - 1);" + "redis.call('hset', productKey, 'salableStock', salableStock + 1);"; redisNode.eval(luaScript); } }
9.实现秒杀系统高可用架构的伪代码
(1)Redis集群故障时的高可用
(2)MQ集群故障时的高可用
(3)订单系统异常时的高可用
(1)Redis集群故障时的高可用
某个库存分片故障时自动迁移其他库存分片,或者将抢购请求刷盘并启用线程处理。
//基于Redis集群进行数据分片的工具类 public class RedisCluster { //Redis集群对应的Jedis列表 private List<Jedis> cluster = new ArrayList<Jedis>(); //私有构造函数 private RedisCluster() { //初始化redis集群 cluster.add(new Jedis("127.0.0.1", 6479)); cluster.add(new Jedis("127.0.0.1", 6579)); cluster.add(new Jedis("127.0.0.1", 6679)); } //单例 private static class Singleton { static RedisCluster instance = new RedisCluster(); } //获取单例 public static RedisCluster getInstance() { return Singleton.instance; } //初始化秒杀商品库存 public void initSeckillProductStock(Long productId, Long seckillStock) { //计算每个Redis节点上的库存分片数量 int clusterSize = cluster.size(); Long seckillStockPerNode = seckillStock / clusterSize; Long remainSeckillStock = seckillStock - seckillStockPerNode * clusterSize; Long seckillStockLastNode = seckillStockPerNode + remainSeckillStock; System.out.println("每个Redis节点(除最后一个节点外)的库存数量为:" + seckillStockPerNode); System.out.println("最后一个Redis节点的库存数量为:" + seckillStockLastNode); System.out.println("Redis集群的总库存数量为:" + (seckillStockPerNode * (clusterSize - 1) + seckillStockLastNode)); //对除最后一个Redis节点之外的其他节点,进行库存分片数据初始化 for (int i = 0; i < cluster.size() - 1; i++) { Jedis jedis = cluster.get(i); Map<String, String> dataMap = new HashMap<String, String>(); dataMap.put("salableStock", String.valueOf(seckillStockPerNode)); dataMap.put("lockedStock", "0"); dataMap.put("soldStock", "0"); jedis.hset("seckill::product::" + productId + "::stock", dataMap); } //对Redis集群最后一个节点,进行库存分片数据初始化 Jedis jedis = cluster.get(cluster.size() - 1); Map<String, String> dataMap = new HashMap<String, String>(); dataMap.put("salableStock", String.valueOf(seckillStockLastNode)); dataMap.put("lockedStock", "0"); dataMap.put("soldStock", "0"); jedis.hset("seckill::product::" + productId + "::stock", dataMap); } //基于Redis集群进行秒杀抢购 public Boolean flashSale(Long userId, Long productId) { //随机选择一个Redis的节点 int redisNodeCount = cluster.size(); int chosenRedisNodeIndex = new Random().nextInt(redisNodeCount); Jedis chosenRedisNode = cluster.get(chosenRedisNodeIndex); //向Redis节点提交一个Lua脚本进行抢购 String flashSaleLuaScript = "" + "local productKey = 'seckill::product::" + productId + "::stock';" + "local salableStock = redis.call('hget', productKey, 'salableStock');" + "local lockedStock = redis.call('hget', productKey, 'lockedStock');" + "if (salableStock > 0) " + "then " + " redis.call('hset', productKey, 'salableStock', salableStock - 1);" + " redis.call('hset', productKey, 'lockedStock', lockedStock + 1);" + " return 'success';" + "else " + " return 'fail';" + "end;"; //通过Jedis的eval()方法执行Lua脚本 String flashSaleResult = (String) chosenRedisNode.eval(flashSaleLuaScript); String flashSaleResult = null; try { flashSaleResult = (String) chosenRedisNode.eval(flashSaleLuaScript); } catch(Exception e) { //如果在这里报错了,那么很有可能就是某一台Redis机器崩溃了 //此时可能只是一部分的库存分片不可用,所以可以去找其他库存分片来进行秒杀抢购 try { return tryOtherStockShard(chosenRedisNodeIndex, flashSaleLuaScript, userId, productId); } catch(Exception e1) { //高可用TODO //如果在这里报错,就表明所有的Redis节点都崩溃了 //此时可以尝试把抢购请求写入到本地磁盘,让用户的抢购状态保持在抢购中,并开启线程进行处理 return false; } } //如果秒杀抢购成功 if ("success".equals(flashSaleResult)) { JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(chosenRedisNodeIndex)); return true; } else { //如果第一次秒杀抢购失败了,则进行库存分片迁移的操作 try { return tryOtherStockShard(chosenRedisNodeIndex, flashSaleLuaScript, userId, productId); } catch(Exception e) { //高可用TODO //如果在这里报错,就表明所有的Redis节点都崩溃了 //此时可以尝试把抢购请求写入到本地磁盘,让用户的抢购状态保持在抢购中,并开启线程进行处理 return false; } } } //尝试其他的库存分片节点 private Boolean tryOtherStockShard(int failedStockShard, String flashSaleLuaScript, Long userId, Long productId) throws Exception { String flashSaleResult = null; Boolean flashSaleSuccess = false; Boolean allRedisNodeCrashed = true; for (int i = 0; i < cluster.size(); i++) { if( i != failedStockShard) { try { Jedis redisNode = cluster.get(i); flashSaleResult = (String) redisNode.eval(flashSaleLuaScript); allRedisNodeCrashed = false; if ("success".equals(flashSaleResult)) { JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(i)); flashSaleSuccess = true; break; } } catch(Exception e) { //高可用TODO //在尝试其他节点进行抢购的时候,其他某个节点也出现了宕机问题 } } } //如果所有的Redis节点都崩溃了 if (allRedisNodeCrashed) { throw new Exception("所有Redis节点都崩溃了!!!"); } return flashSaleSuccess; } //秒杀订单支付成功的库存处理逻辑 public void flashSaleOrderPaySuccess(String stockShardRedisNode, Long productId) { Integer redisNodeIndex = Integer.valueOf(stockShardRedisNode); Jedis redisNode = cluster.get(redisNodeIndex); String luaScript = "" + "local productKey = 'seckill::product::" + productId + "::stock';" + "local lockedStock = redis.call('hget', productKey, 'lockedStock');" + "local soldStock = redis.call('hget', productKey, 'soldStock');" + "redis.call('hset', productKey, 'lockedStock', lockedStock - 1);" + "redis.call('hset', productKey, 'soldStock', lockedStock + 1);"; redisNode.eval(luaScript); } //秒杀订单支付失败的库存处理逻辑 public void flashSaleOrderPayFail(String stockShardRedisNode, Long productId) { Integer redisNodeIndex = Integer.valueOf(stockShardRedisNode); Jedis redisNode = cluster.get(redisNodeIndex); String luaScript = "" + "local productKey = 'seckill::product::" + productId + "::stock';" + "local salableStock = redis.call('hget', productKey, 'salableStock');" + "local lockedStock = redis.call('hget', productKey, 'lockedStock');" + "redis.call('hset', productKey, 'lockedStock', lockedStock - 1);" + "redis.call('hset', productKey, 'salableStock', salableStock + 1);"; redisNode.eval(luaScript); } }
(2)MQ集群故障时的高可用
将MQ消息写入磁盘并启用线程进行处理。
//秒杀抢购请求处理接口 @RestController @RequestMapping("/seckill/flash/sale") public class FlashSaleController { //用户对商品进行抢购,默认限定每个商品最多只能抢购一件 @GetMapping("/") public String flashSale(Long userId, Long productId) { //秒杀抢购代码的核心是Lua脚本,需要实现当库存分片为0时,自动进行库存分片节点迁移 RedisCluster redisCluster = RedisCluster.getInstance(); Boolean flashSaleResult = redisCluster.flashSale(userId, productId); //如果秒杀抢购成功了,则发送消息到MQ进行异步下单 if (flashSaleResult) { DefaultMQProducer producer = RocketMQProducer.getInstance().getProducer(); try { JSONObject flashSaleSuccessInform = new JSONObject(); flashSaleSuccessInform.put("userId", userId); flashSaleSuccessInform.put("productId", productId); Message message = new Message("flash_sale_success_inform", null, flashSaleSuccessInform.toJSONString().getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.printf("%s%n", sendResult); System.out.println("发送秒杀抢购成功的消息到MQ成功......"); } catch(Exception e) { System.err.println("发送秒杀抢购成功的消息到MQ失败:" + e); //高可用TODO //可以把MQ消息写入到本地磁盘的进行积压,然后开启一个后台线程不停尝试MQ是否恢复 //如果MQ恢复了,就可以把本地磁盘积压的消息发送出去 return "秒杀抢购成功,但是推送消息到MQ失败"; } return "秒杀抢购成功"; } else { return "秒杀抢购失败"; } } }
(3)订单系统异常时的高可用
判断MQ消息是否积压超过某时间,若是则进行快速失败释放库存。此外订单系统异常时,需要阻塞消费MQ消息的线程,之后再重试。
@Component public class BootListener implements CommandLineRunner { public static final Long ORDER_RATE_LIMIT = 500L; public void run(String... strings) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("seckill-order-consumer-group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("flash_sale_success_inform", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : messageExts) { System.out.println(new String(messageExt.getBody())); //抢购成功的通知 JSONObject flashSaleSuccessInform = JSONObject.parseObject(new String(messageExt.getBody())); //调用订单中心提供的接口进行秒杀抢购的下单 Long userId = flashSaleSuccessInform.getLong("userId"); Long productId = flashSaleSuccessInform.getLong("productId"); //高可用TODO //对每个抢购成功的消息都获取一下其发送的时间戳 //然后判断该消息是否已经在MQ里积压超过半个小时,如果是就进行快速失败 //通过fail-fast机制,直接推送一条快速失败的消息到MQ //让库存系统消费该快速失败的消息,释放掉抢购商品的库存 //接着再更新用户抢购的状态为抢购失败 //在这里需要对下单进行限流,防止大量请求访问订单系统让订单系统产生压力 JedisManager jedisManager = JedisManager.getInstance(); Jedis jedis = jedisManager.getJedis(); Boolean orderResult = false; while (!orderResult) { Date now = new Date(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String currentSecond = dateFormat.format(now); Long result = jedis.incr("flash_sale::order::rate_limit::" + currentSecond); if (result < ORDER_RATE_LIMIT) { System.out.println("调用订单中心提供的接口进行秒杀抢购的下单,用户id: " + userId + ", 商品id: " + productId); //高可用TODO //如果订单系统崩溃了,那么执行到此处的消费线程应进入阻塞,不能继续消费后面的消息了 //阻塞个几分钟过后,再尝试调用订单系统进行下单 orderResult = true; } else { //如果当前这一秒限流了,此时休眠一秒,下一秒继续进行下单即可 try { Thread.sleep(1000); } catch(InterruptedException e) { e.printStackTrace(); } } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("消费者启动......"); } }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等