商品中心—13.商品卖家系统的高并发文档
大纲
1.阿里云Tair接入与Jedis连接池使用
2.Redis集群连接池与通用对象连接池源码
3.商品卖家系统高并发场景与缓存预热逻辑
4.商品卖家系统缓存预热架构设计
5.商品卖家系统单机下的缓存预热的实现
6.商品卖家系统分布式下的缓存预热的实现
7.商品卖家系统定时查询DB最新数据更新缓存
8.商品中心高并发架构总结
1.阿里云Tair接入与Jedis连接池使用
(1)引入依赖
(2)添加配置文件
(3)配置Tair相关Bean
(4)使用Tair相关命令
(1)引入依赖
Tair是多线程的,Redis是单线程的。TairJedis是阿⾥云基于Jedis开发的Redis企业版专⽤客户端。TairJedis除了Jedis原有功能,还⽀持Redis企业版包含的命令。
(1)引入依赖
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>com.aliyun.tair</groupId> <artifactId>alibabacloud-tairjedis-sdk</artifactId> <version>2.1.0</version> </dependency>
(2)添加配置文件
spring: tair: host: r-xxxxxxxxxxxxxxx.redis.rds.aliyuncs.com port: 6379 password: password timeout: 3000 maxIdle: 10000 maxTotal: 10000
(3)配置Tair相关Bean
@Data @Configuration public class TairConfig { @Value("${spring.tair.maxIdle:200}") private int maxIdle; @Value("${spring.tair.maxTotal:300}") private int maxTotal; @Value("${spring.tair.host}") private String host; @Value("${spring.tair.port:6379}") private int port; @Value("${spring.tair.password}") private String password; @Value("${spring.tair.timeout:3000}") private int timeout; @Bean @ConditionalOnClass(JedisPool.class) public JedisPool jedisPool() { JedisPoolConfig config = new JedisPoolConfig(); //最大空闲连接数,需自行评估,不超过Redis实例的最大连接数 config.setMaxIdle(maxIdle); //最大连接数,需自行评估,不超过Redis实例的最大连接数 config.setMaxTotal(maxTotal); config.setTestOnBorrow(false); config.setTestOnReturn(false); return new JedisPool(config, host, port, timeout, password); } @Bean @ConditionalOnClass(TairCache.class) public TairCache tairCache(JedisPool jedisPool) { return new TairCache(jedisPool); } }
(4)使用Tair相关命令
@Component public class TairCache { private JedisPool jedisPool; public TairCache(JedisPool jedisPool) { this.jedisPool = jedisPool; } public Jedis getJedis() { return jedisPool.getResource(); } public TairString createTairString(Jedis jedis) { return new TairString(jedis); } private TairHash createTairHash(Jedis jedis) { return new TairHash(jedis); } //缓存存储 public void set(String key, String value, int seconds) { log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds); try (Jedis jedis = getJedis()) { TairString tairString = createTairString(jedis); String result; if (seconds > 0) { result = tairString.exset(key, value, new ExsetParams().ex(seconds)); } else { result = tairString.exset(key, value); } log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds); } } //tairString存储 public boolean exset(String key, String value, int seconds) { try (Jedis jedis = getJedis()) { TairString tairString = createTairString(jedis); String result; if (seconds > 0) { result = tairString.exset(key, value, new ExsetParams().ex(seconds)); } else { result = tairString.exset(key, value); } return "OK".equals(result); } } //缓存获取 public String get(String key) { try (Jedis jedis = getJedis()) { ExgetResult<String> exget = createTairString(jedis).exget(key); if (exget == null) { return null; } return exget.getValue(); } catch (Exception e) { log.error("tairString get error,key{}", key, e); } return null; } //缓存自增 public Integer incr(String key) { return this.exincrby(key, 1); } //缓存自增 public Integer incr(String key, Integer incrNum) { return this.exincrby(key, incrNum); } //缓存自减 public Integer decr(String key, Integer decrNum) { return this.exincrby(key, -Math.abs(decrNum)); } //删除缓存 public Integer delete(String key) { try (Jedis jedis = getJedis()) { return jedis.del(key).intValue(); } } //删除缓存 public Integer mdelete(List<String> keyList) { try (Jedis jedis = getJedis()) { return jedis.del(keyList.toArray(new String[keyList.size()])).intValue(); } } //缓存批量获取 public List mget(List<String> keyList) { try (Jedis jedis = getJedis()) { TairString tairString = createTairString(jedis); return keyList.stream().map(key -> { ExgetResult<String> exget = tairString.exget(key); if (exget != null) { return exget.getValue(); } return null; }).collect(Collectors.toList()); } } //对TairString的value进行自增自减操作 public Integer exincrby(String key, Integer incrNum) { try (Jedis jedis = getJedis()) { int value = createTairString(jedis).exincrBy(key, incrNum, ExincrbyParams.ExincrbyParams().min(0)).intValue(); return value; } catch (Exception e) { //出现自增或者自减数据溢出等异常,直接返回操作失败 return -1; } } //存储hash对象 public Integer exhset(String key, String value) { try (Jedis jedis = getJedis()) { Map<String, Object> innerMap = JSON.parseObject(value).getInnerMap(); Map<String, String> map = Maps.transformEntries(innerMap, (k, v) -> String.valueOf(v)); String exhmset = createTairHash(jedis).exhmset(key, map); //成功返回 OK return 0; } } //存储hash对象 public Integer exhset(String key, String field, String value) { try (Jedis jedis = getJedis()) { return createTairHash(jedis).exhset(key, field, value, ExhsetParams.ExhsetParams().nx()).intValue(); } } //获取hash对象 public String exhget(String key, String field) { try (Jedis jedis = getJedis()) { String exhget = createTairHash(jedis).exhget(key, field); log.info("exhget key:{}, field:{}, value:{}", key, field, exhget); return exhget; } } }
2.Redis集群连接池与通用对象连接池源码
(1)Spring的RedisConnectionFactory和实现JedisConnectionFactory
(2)Jedis的Pool对象是基于Apache Commons的通用对象池的
(3)Apache Commons的通用对象池的borrowObject()方法
(4)ShardedJedisFactory中创建Jedis连接对象的具体方法
(5)Redis分片对象ShardedJedis的创建
(6)ShardedJedisPool的初始化
这里的Redis集群在客户端使用一致性Hash进行分片,与Redis Cluster这种服务端集群是不同的。
(1)Spring的RedisConnectionFactory和实现JedisConnectionFactory
@Data @Configuration @ConditionalOnClass(RedisConnectionFactory.class) public class RedisConfig { ... @Bean @ConditionalOnClass(RedisConnectionFactory.class) public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); redisTemplate.setDefaultSerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } ... } public interface RedisConnectionFactory extends PersistenceExceptionTranslator { //Provides a suitable connection for interacting with Redis. RedisConnection getConnection(); //Provides a suitable connection for interacting with Redis Cluster. RedisClusterConnection getClusterConnection(); //Provides a suitable connection for interacting with Redis Sentinel. RedisSentinelConnection getSentinelConnection(); } public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory { private @Nullable Pool<Jedis> pool; ... public RedisConnection getConnection() { if (isRedisClusterAware()) { return getClusterConnection(); } Jedis jedis = fetchJedisConnector(); JedisConnection connection = (getUsePool() ? new JedisConnection(jedis, pool, getDatabase(), getClientName()) : new JedisConnection(jedis, null, getDatabase(), getClientName())); connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults); return postProcessConnection(connection); } protected Jedis fetchJedisConnector() { try { if (getUsePool() && pool != null) { return pool.getResource(); } Jedis jedis = createJedis(); jedis.connect(); potentiallySetClientName(jedis); return jedis; } catch (Exception ex) { throw new RedisConnectionFailureException("Cannot get Jedis connection", ex); } } ... }
(2)Jedis的Pool对象是基于Apache Commons的通用对象池的
public abstract class Pool<T> implements Closeable { //internalPool就是Apache Commons的通用对象池 protected GenericObjectPool<T> internalPool; public T getResource() { try { return internalPool.borrowObject(); } catch (NoSuchElementException nse) { ... } } } //假设使用ShardedJedisPool去连接多个Redis节点组成的集群,去通过getResource()方法获取连接对象 public class ShardedJedisPool extends Pool<ShardedJedis> { ... @Override public ShardedJedis getResource() { ShardedJedis jedis = super.getResource(); jedis.setDataSource(this); return jedis; } //激活方法为空 @Override public void activateObject(PooledObject<ShardedJedis> p) throws Exception { } ... }
(3)Apache Commons的通用对象池的borrowObject()方法
public class GenericObjectPool<T> extends BaseGenericObjectPool<T> implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> { //存放空闲对象的队列 private final LinkedBlockingDeque<PooledObject<T>> idleObjects; ... @Override public T borrowObject() throws Exception { return borrowObject(getMaxWaitMillis()); } public T borrowObject(final long borrowMaxWaitMillis) throws Exception { assertOpen(); final AbandonedConfig ac = this.abandonedConfig; if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) && (getNumActive() > getMaxTotal() - 3) ) { removeAbandoned(ac); } PooledObject<T> p = null; //如果连接耗尽是否需要阻塞 final boolean blockWhenExhausted = getBlockWhenExhausted(); boolean create; final long waitTime = System.currentTimeMillis(); while (p == null) { create = false; //从队列中获取对象 p = idleObjects.pollFirst(); if (p == null) { //创建对象 p = create(); if (p != null) { create = true; } } ... } updateStatsBorrow(p, System.currentTimeMillis() - waitTime); return p.getObject(); } //创建对象 private PooledObject<T> create() throws Exception { ... final PooledObject<T> p; try { //调用Factory的makeObject()方法创建对象 p = factory.makeObject(); if (getTestOnCreate() && !factory.validateObject(p)) { createCount.decrementAndGet(); return null; } } catch (final Throwable e) { createCount.decrementAndGet(); throw e; } finally { synchronized (makeObjectCountLock) { makeObjectCount--; makeObjectCountLock.notifyAll(); } } ... return p; } @Override public void addObject() throws Exception { assertOpen(); if (factory == null) { throw new IllegalStateException("Cannot add objects without a factory."); } final PooledObject<T> p = create(); addIdleObject(p); } private void addIdleObject(final PooledObject<T> p) throws Exception { if (p != null) { factory.passivateObject(p); if (getLifo()) { idleObjects.addFirst(p); } else { idleObjects.addLast(p); } } } ... }
(4)ShardedJedisFactory中创建Jedis连接对象的具体方法
一个连接会封装成一个DefaultPooledObject对象。
public class ShardedJedisPool extends Pool<ShardedJedis> { ... private static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> { private List<JedisShardInfo> shards; ... //创建Jedis连接对象的具体方法 @Override public PooledObject<ShardedJedis> makeObject() throws Exception { //创建ShardedJedis对象 ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern); return new DefaultPooledObject<ShardedJedis>(jedis); } ... } ... }
(5)Redis分片对象ShardedJedis的创建
一个ShardedJedis分片可以理解成对应一个集群,一个ShardedJedis中会有各个Redis节点的连接。即一个ShardedJedis中,会连接Redis的各个节点,每个ShardedJedis分片都有多个虚拟节点。
public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable { ... public ShardedJedis(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) { super(shards, algo, keyTagPattern); } ... } public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo> implements BinaryJedisCommands { ... public BinaryShardedJedis(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) { super(shards, algo, keyTagPattern); } ... } public class Sharded<R, S extends ShardInfo<R>> { public static final int DEFAULT_WEIGHT = 1; private TreeMap<Long, S> nodes; private final Hashing algo; private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>(); ... public Sharded(List<S> shards, Hashing algo, Pattern tagPattern) { this.algo = algo; this.tagPattern = tagPattern; initialize(shards); } private void initialize(List<S> shards) { nodes = new TreeMap<Long, S>(); //每个分片都有多个虚拟节点,存放在nodes中 for (int i = 0; i != shards.size(); ++i) { final S shardInfo = shards.get(i); int N = 160 * shardInfo.getWeight(); if (shardInfo.getName() == null) { for (int n = 0; n < N; n++) { nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo); } } else { for (int n = 0; n < N; n++) { nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo); } } resources.put(shardInfo, shardInfo.createResource()); } } ... } public class JedisShardInfo extends ShardInfo<Jedis> { ... @Override public Jedis createResource() { //封装一个Jedis对象 return new Jedis(this); } ... }
(6)ShardedJedisPool的初始化
public class ShardedJedisPool extends Pool<ShardedJedis> { ... public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) { super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern)); } ... } public abstract class Pool<T> implements Closeable { protected GenericObjectPool<T> internalPool; ... public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) { initPool(poolConfig, factory); } public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) { if (this.internalPool != null) { try { closeInternalPool(); } catch (Exception e) { } } this.internalPool = new GenericObjectPool<>(factory, poolConfig); } ... } public class GenericObjectPool<T> extends BaseGenericObjectPool<T> implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> { //存放空闲对象的队列 private final LinkedBlockingDeque<PooledObject<T>> idleObjects; ... public GenericObjectPool(final PooledObjectFactory<T> factory, final GenericObjectPoolConfig<T> config) { super(config, ONAME_BASE, config.getJmxNamePrefix()); if (factory == null) { jmxUnregister();//tidy up throw new IllegalArgumentException("factory may not be null"); } this.factory = factory; //空闲对象 idleObjects = new LinkedBlockingDeque<>(config.getFairness()); setConfig(config); } public void setConfig(final GenericObjectPoolConfig<T> conf) { super.setConfig(conf); setMaxIdle(conf.getMaxIdle());//设置最大空闲数 setMinIdle(conf.getMinIdle());//设置最小空闲数 setMaxTotal(conf.getMaxTotal()); } ... }
(7)ShardedJedisPool的一致性Hash算法
下面使用ShardedJedis的set()方法为例进行说明,这里的Redis集群在客户端使用一致性Hash进行分片,与Redis Cluster这种服务端集群是不同的。
//假设使用ShardedJedisPool去连接多个Redis节点组成的集群,去通过getResource()方法获取连接对象 public class ShardedJedisPool extends Pool<ShardedJedis> { ... @Override public ShardedJedis getResource() { ShardedJedis jedis = super.getResource(); jedis.setDataSource(this); return jedis; } ... } public class ShardedJedisPool extends Pool<ShardedJedis> { ... @Override public ShardedJedis getResource() { ShardedJedis jedis = super.getResource(); jedis.setDataSource(this); return jedis; } ... } public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable { ... protected ShardedJedisPool dataSource = null; @Override public String set(final String key, final String value) { Jedis j = getShard(key);//根据key获取分片 return j.set(key, value); } ... } public class Sharded<R, S extends ShardInfo<R>> { private TreeMap<Long, S> nodes; private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>(); ... //根据key获取连接对象 public R getShard(String key) { return resources.get(getShardInfo(key)); } public S getShardInfo(String key) { return getShardInfo(SafeEncoder.encode(getKeyTag(key))); } public S getShardInfo(byte[] key) { SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key)); if (tail.isEmpty()) { return nodes.get(nodes.firstKey()); } return tail.get(tail.firstKey()); } ... }
3.商品卖家系统高并发场景与缓存预热逻辑
(1)缓存预热的必要性
(2)缓存预热方式之单机和分布式的区别
(3)缓存预热的逻辑
(1)缓存预热的必要性
查询卖家信息的接口会被C端⽤户频繁请求。因为C端⽤户在使⽤商品系统时,会产⽣⼤量的卖家数据请求。如果这些请求直接查询卖家数据库,则会对卖家数据库产⽣⾮常⼤的压⼒。所以查询卖家信息的接口,需要针对卖家数据进行缓存。同时由于请求量⽐较⼤,所以不能通过⽤户发起请求来进行惰性缓存,因此需要进行卖家数据的缓存预热。
因为卖家数据是⼀组相对固定、变化⼩的数据,所以在对外提供服务前,可先批量从DB获取数据,再设置到Redis缓存中。
查询卖家信息的接口如下:
@DubboService(version = "1.0.0", interfaceClass = SellerAbilityApi.class, retries = 0) public class SellerAbilityApiImpl implements SellerAbilityApi { @Autowired private SellerInfoService sellerInfoService; ... //提供给商品C端调用,根据卖家ID和卖家类型获取卖家信息 @Override public JsonResult<List<SellerInfoResponse>> getSellerInfo(SellerInfoRequest sellerInfoRequest) { try { List<SellerInfoResponse> sellerInfoResponseList = sellerInfoService.querySellerInfoForRPC(sellerInfoRequest); return JsonResult.buildSuccess(sellerInfoResponseList); } catch (ProductBizException e) { log.error("biz error: request={}", JSON.toJSONString(sellerInfoRequest), e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error: request={}", JSON.toJSONString(sellerInfoRequest), e); return JsonResult.buildError(e.getMessage()); } } ... } @Service public class SellerInfoServiceImpl implements SellerInfoService { @Autowired private SellerInfoCache sellerInfoCache; ... //不同系统间调用RPC接口,查询卖家 //卖家系统提供给外部系统调用的卖家查询接口,只允许通过sellerIdList或者sellerType两个参数进行查询 @Override public List<SellerInfoResponse> querySellerInfoForRPC(SellerInfoRequest request) { //参数校验,RPC接口根据sellerIdList和sellerType查询 checkQuerySellerInfoRequestByRPC(request); //如果未传入sellerIdList if (CollectionUtils.isEmpty(request.getSellerIdList())) { //根据sellerType获取该类型下的sellerId集合 Optional<List<Long>> sellerIdListOps = getSellerIdListBySellerType(request); //如果类型下没有sellerId数据,直接返回空卖家集合 if (!sellerIdListOps.isPresent()) { return Collections.emptyList(); } //将根据sellerType查询到的sellerIdList数据set到request中 request.setSellerIdList(sellerIdListOps.get()); } //根据sellerIdList从缓存或者数据库中查询卖家信息 Optional<List<SellerInfoResponse>> sellerInfoListOps = sellerInfoCache.listRedisStringDataByCache( request.getSellerIdList(), SellerInfoResponse.class, sellerId -> SellerRedisKeyConstants.SELLER_INFO_LIST + sellerId, //根据sellerId从DB中获取SellerInfo数据 sellerId -> sellerRepository.querySellerInfoBySellerId(sellerId) ); if (!sellerInfoListOps.isPresent()) { return Collections.emptyList(); } //过滤类型不一致的卖家信息 return filterAccordantSellerInfo(sellerInfoListOps.get(), request); } ... }
(2)缓存预热方式之单机和分布式的区别
卖家系统提供了两个接口可手动触发单机的缓存预热 + 分布式的缓存预热。无论是单机还是分布式的缓存预热,都会查出全量的卖家数据来进行预热。
如果已预热过,可通过设置force=true来强制再进行全量卖家数据的预热。由于预热完成后,卖家系统可能还会对卖家数据进行变更,所以每隔5分钟执行一个定时任务查询DB里的数据,diff缓存数据并刷新。
第一次预热可使用单机的缓存预热接口、也可使用分布式的缓存预热接口。
一.单机的意思,就是由卖家系统的一台机器进行全量卖家数据的预热操作。
二.分布式的意思,就是收到分布式预热请求的一台机器,会先去DB查询出一批批的卖家数据,然后发送到MQ。接着由卖家系统的多台机器去消费这些卖家数据,从而实现分布式预热。
(3)缓存预热的逻辑
预热逻辑,在业务上并不复杂。主要是在尚未对外提供服务前,把所有的卖家数据,设置到Redis缓存。在对外提供服务后,能做到绝大部分的卖家信息请求,都能直接通过缓存来提供数据服务,避免对MySQL造成⽐较⼤的压⼒。
4.商品卖家系统缓存预热架构设计
(1)Redis缓存结构设计
(2)单机的缓存预热方案设计
(3)分布式的缓存预热⽅案设计
(1)Redis缓存结构设计
(2)单机的缓存预热方案设计
步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。
步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新。如果为0,则代表不强制刷新缓存。
步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。
步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据,否则代表预热的是sellerType卖家。根据sellerType,一页一页去DB查询出卖家数据,直到查出的数据为空。
步骤五:将每一页查出的100条卖家数据,构建成写入缓存的数据Map。接着将数据Map通过mset写入缓存,以任务的形式,提交给线程池处理。其中的线程池会通过Semaphore进行阻塞最多提交20个写入缓存的任务。
步骤六:数据处理完后,在Redis中设置卖家信息的缓存预热状态为成功。
步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。
步骤八:最后在finally块中释放分布式锁。
(3)分布式的缓存预热⽅案设计
一.⽣产者
步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。
步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新。如果为0,则代表不强制刷新缓存。
步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。
步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据;否则代表预热的是sellerType卖家。然后通过分批次,查询出满⾜sellerType类型的所有卖家数据。每批查100个卖家,会将这100个卖家数据转换为JSON字符串。接着将JSON字符串添加到集合中,将最终的查询结果组装为List。
步骤五:将查询出的卖家数据集合List批量或分批次发送给MQ,对应的MQ的topic为:preheat_seller_cache_bucket_topic。
步骤六:消息发送成功后,在Redis中设置卖家信息的缓存预热状态为成功。
步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。
步骤八:最后在finally块中释放分布式锁。
二.消费者
步骤一:每个消费者每次消费⼀条消息。
步骤二:拿到消息后,解析出需要预热的缓存数据,即构造出100个元素的sellerInfoResultMap。其中key为Redis卖家信息缓存的key,value为卖家信息JSON字符串,然后通过执行mset将数据设置到Redis缓存。
步骤三:接着提取出sellerIdList集合,并根据消息中卖家信息的sellerType,构造出Redis卖家类型的⻚缓存的key,然后通过set命令设置到Redis。
5.商品卖家系统单机下的缓存预热的实现
(1)实现细节
(2)实现代码
(1)实现细节
一.一页一页地查询数据,然后构建写入缓存的数据的Map,最后生成任务提交给线程池进行处理。
二.如果在ThreadPoolExecutor中设置一个无限队列的Queue,那么可能会让该Queue不断增长从而撑满内存。如果提交的任务不需排队,而是提交一个任务就创建一个线程来执行,那么又可能会耗尽线程。
所以为了线程池安全,可以使用Semaphore信号量进行阻塞。如果超过了20个任务同时要运行,会通过Semaphore信号量阻塞提交任务,从而实现安全的线程池。
(2)实现代码
步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。
步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新。如果为0,则代表不强制刷新缓存。
步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。
步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据;否则代表预热的是sellerType卖家。根据sellerType,一页一页去DB查询出卖家数据,直到查出的数据为空。
步骤五:将每一页查出的100条卖家数据,构建成写入缓存的数据Map。接着将数据Map通过mset写入缓存,以任务的形式,提交给线程池处理。其中的线程池会通过Semaphore进行阻塞最多提交20个写入缓存的任务。
步骤六:数据处理完后,在Redis中设置卖家信息的缓存预热状态为成功。
步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。
步骤八:最后在finally块中释放分布式锁。
@RestController @RequestMapping("/sellerCache") public class SellerCacheController { @Autowired private SellerInfoCache sellerInfoCache; @PostMapping("/preheat") public JsonResult<Boolean> preheatSellerCache(@RequestBody SellerInfoCacheRequest request) { try { Boolean result = sellerInfoCache.preheatSellerCache(request); return JsonResult.buildSuccess(result); } catch (BaseBizException baseBizException) { log.error("biz error: request={}", JSON.toJSONString(request), baseBizException); return JsonResult.buildError(baseBizException.getMessage()); } catch (Exception e) { log.error("system error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getMessage()); } } ... } @Service("sellerInfoCache") public class SellerInfoCache { @Resource private SellerRepository sellerRepository; @Resource private RedisCache redisCache; @Resource private RedisLock redisLock; //缓存预热的线程池 @Autowired @Qualifier("preheatCacheThreadPool") private PreheatCacheThreadPool preheatCacheThreadPool; ... //根据卖家类型预热卖家缓存 public Boolean preheatSellerCache(SellerInfoCacheRequest cacheRequest) { try { //1.先获取一把分布式锁 redisLock.lock(SELLER_INFO_CACHE_PREHEAT_LOCK); //2.判断是否需要强制刷新缓存,如果不需要强制刷新,则判断是否预热过 //如果需要强制刷新缓存,则不用判断是否预热过 if (!NumberUtils.INTEGER_ONE.equals(cacheRequest.getForceFlush())) { //如果已经预热过,则直接返回true if (isPreheated()) { return true; } } //3.根据卖家类型刷新缓存,如果没有选择类型,则默认两种都进行预热 if (NumberUtils.INTEGER_ZERO.equals(cacheRequest.getSellerType())) { //预热卖家信息 preheatSellerInfoToCache(SellerTypeEnum.SELF.getCode()); //预热卖家id列表信息 preheatSellerIdCache(SellerTypeEnum.SELF.getCode()); preheatSellerInfoToCache(SellerTypeEnum.POP.getCode()); preheatSellerIdCache(SellerTypeEnum.POP.getCode()); } else { //4.如果选择了类型,就按照类型来预热 preheatSellerInfoToCache(cacheRequest.getSellerType()); preheatSellerIdCache(cacheRequest.getSellerType()); } redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.SUCCESS.getCode().toString(), -1); return true; } catch (Exception e) { redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.FAIL.getCode().toString(), -1); throw new BaseBizException("缓存预热失败,error:{}", JSON.toJSONString(e)); } finally { //释放锁 redisLock.unlock(SELLER_INFO_CACHE_PREHEAT_LOCK); } } //判断缓存是否预热成功 private Boolean isPreheated() { //如果预热key不存在,则说明没有预热过 Boolean isExist = redisCache.hasKey(SELLER_INFO_CACHE_PREHEAT_SUCCESS); if (!isExist) { return false; } //如果预热key存在,且值为1,则代表预热成功 Integer success = Integer.parseInt(redisCache.get(SELLER_INFO_CACHE_PREHEAT_SUCCESS)); if (!SellerCacheStatusEnum.SUCCESS.getCode().equals(success)) { return false; } return true; } //根据用户类型预热缓存 public Boolean preheatSellerInfoToCache(Integer type) { SellerInfoRequest request = new SellerInfoRequest(); request.setSellerType(type); Integer pageNum = request.getPageNo(); //设置每页数据量 request.setPageSize(CollectionSize.DEFAULT); //一页一页地去查询数据并交给线程池进行处理 while (true) { Optional<List<SellerInfoResponse>> sellerInfoResponses = sellerRepository.querySellerInfo(request); if (!sellerInfoResponses.isPresent()) { break; } List<SellerInfoResponse> sellerInfoResponseList = sellerInfoResponses.get(); //查到的数据为空时跳出循环 if (CollectionUtils.isEmpty(sellerInfoResponseList)) { break; } Map<String, String> result = new HashMap<>(sellerInfoResponseList.size()); //构建写入缓存的数据的map for (SellerInfoResponse sellerInfoResponse : sellerInfoResponseList) { Long sellerId = sellerInfoResponse.getSellerId(); result.put(SellerRedisKeyConstants.SELLER_INFO_LIST + sellerId, JSON.toJSONString(sellerInfoConverter.responseToDTO(sellerInfoResponse))); } log.info("本批次缓存map:{}", JSON.toJSONString(result)); //把本批次的缓存预热任务提交给线程池处理 preheatCacheThreadPool.execute(() -> redisCache.mset(result)); //继续取下一页的用户数据 request.setPageNo(++pageNum); log.info("第" + pageNum + "页数据预热完成"); } return true; } //根据类型预热用户id缓存 Boolean preheatSellerIdCache(Integer type) { //根据类型全量查卖家ID SellerInfoRequest request = new SellerInfoRequest(); Integer pageNo = request.getPageNo(); request.setSellerType(type); request.setPageSize(CollectionSize.DEFAULT); String key = SellerRedisKeyConstants.SELF_TYPE_LIST; if (SellerTypeEnum.POP.getCode().equals(type)) { key = SellerRedisKeyConstants.POP_TYPE_LIST; } while (true) { Optional<List<Long>> optionalList = sellerRepository.pageSellerIdListByType(request); if (!optionalList.isPresent()) { break; } List<Long> sellerIdList = optionalList.get(); //查到的数据为空时跳出循环 if (CollectionUtils.isEmpty(sellerIdList)) { break; } log.info("本批次缓存list:{}", JSON.toJSONString(sellerIdList)); //存Redis String finalKey = key + pageNo; preheatCacheThreadPool.execute(() -> redisCache.set(finalKey, JSON.toJSONString(sellerIdList), -1)); //继续取下一页的用户数据 request.setPageNo(++pageNo); } return true; } ... } @Repository public class SellerRepository { ... //根据条件分页查询出卖家列表 public Optional<List<SellerInfoResponse>> querySellerInfo(SellerInfoRequest request) { LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery(); //类型 queryWrapper.eq(Objects.nonNull(request.getSellerType()), SellerInfoDO::getSellerType, request.getSellerType()); //卖家位置(层级) queryWrapper.eq(Objects.nonNull(request.getSellerPosition()), SellerInfoDO::getSellerPosition, request.getSellerPosition()); //状态 queryWrapper.eq(Objects.nonNull(request.getSellerStatus()), SellerInfoDO::getSellerStatus, request.getSellerStatus()); //卖家ID queryWrapper.eq(Objects.nonNull(request.getSellerId()), SellerInfoDO::getSellerId, request.getSellerId()); //卖家编码 queryWrapper.eq(StringUtils.isNotEmpty(request.getSellerCode()), SellerInfoDO::getSellerCode, request.getSellerCode()); //卖家名称 queryWrapper.like(StringUtils.isNotEmpty(request.getSellerName()), SellerInfoDO::getSellerName, request.getSellerName()); //父卖家ID queryWrapper.eq(Objects.nonNull(request.getParentId()), SellerInfoDO::getParentId, request.getParentId()); //卖家ID集合 queryWrapper.in(CollectionUtils.isNotEmpty(request.getSellerIdList()), SellerInfoDO::getSellerId, request.getSellerIdList()); Page<SellerInfoDO> page = new Page<>(request.getPageNo(), request.getPageSize()); Page<SellerInfoDO> pageResult = sellerInfoMapper.selectPage(page, queryWrapper); if (Objects.isNull(pageResult)) { return Optional.of(Collections.emptyList()); } return Optional.of(sellerInfoConverter.listEntityToResponse(pageResult.getRecords())); } ... } public class PreheatCacheThreadPool { private final Semaphore semaphore; private final ThreadPoolExecutor threadPoolExecutor; public PreheatCacheThreadPool(int permits) { //如果超过了20个任务同时要运行,会通过Semaphore信号量阻塞,从而实现安全的线程池 semaphore = new Semaphore(permits); //这里设置核心线程数为0,是因为缓存预热的任务是一次性执行 //所以没有比要保留核心线程,等线程工作完毕,线程就全部退出即可 //如果不使用Semaphore,而在ThreadPoolExecutor中设置一个无限队列的Queue, //那么可能会让该Queue不断增长从而撑满内存; //如果提交的任务不需排队,而是提交一个任务就创建一个线程来执行, //那么又可能会耗尽线程; threadPoolExecutor = new ThreadPoolExecutor( 0, permits * 2,//乘以2是因为semaphore的释放要比线程的完成早一点点 60, TimeUnit.SECONDS, new SynchronousQueue<>() ); } public void execute(Runnable task) { //超过了20个同步任务就阻塞住,不让它执行太多,从而实现安全的线程池 semaphore.acquireUninterruptibly(); threadPoolExecutor.submit(() -> { try { task.run(); } finally { //semaphore的释放要比当前线程任务的结束要早一点点 //导致Semaphore放行很多任务进来了,但是线程池的线程还没释放 semaphore.release(); } }); } }
6.商品卖家系统分布式下的缓存预热的实现
(1)发送消息
(2)消费消息
(1)发送消息
步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。
步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新;如果为0,则代表不强制刷新缓存。
步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。
步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据,否则代表预热的是sellerType卖家。然后通过分批次,查询出满⾜sellerType类型的所有卖家数据。每批查100个卖家,会将这100个卖家数据转换为JSON字符串。接着将JSON字符串添加到集合中,将最终的查询结果组装为List。
步骤五:将查询出的卖家数据集合List批量或分批次发送给MQ,对应的MQ的topic为:preheat_seller_cache_bucket_topic。
步骤六:消息发送成功后,在Redis中设置卖家信息的缓存预热状态为成功。
步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。
步骤八:最后在finally块中释放分布式锁。
@RestController @RequestMapping("/sellerCache") public class SellerCacheController { @Autowired private SellerInfoCache sellerInfoCache; @PostMapping("shardingPreheat") public JsonResult<Boolean> shardingPreheatSellerCache(@RequestBody SellerInfoCacheRequest request) { try { Boolean result = sellerInfoCache.shardingPreheatSellerCache(request); return JsonResult.buildSuccess(result); } catch (BaseBizException baseBizException) { log.error("biz error: request={}", JSON.toJSONString(request), baseBizException); return JsonResult.buildError(baseBizException.getMessage()); } catch (Exception e) { log.error("system error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getMessage()); } } } @Service("sellerInfoCache") public class SellerInfoCache { @Resource private SellerRepository sellerRepository; @Resource private RedisCache redisCache; @Resource private RedisLock redisLock; @Autowired private DefaultProducer defaultProducer; ... //卖家缓存预热,分布式 public Boolean shardingPreheatSellerCache(SellerInfoCacheRequest request) { try { //检查参数 checkPreheatParam(request); //获取一把分布式锁 redisLock.lock(SELLER_INFO_CACHE_PREHEAT_LOCK); //如果需要强制刷新缓存,则不用判断是否预热过 if (!NumberUtils.INTEGER_ONE.equals(request.getForceFlush())) { //如果已经预热过,则直接返回true if (isPreheated()) { return true; } } //分批次查询出卖家数据 //sellerInfoList中的每个String都是100个卖家数据组册的一个batch对应的json数组 List<String> sellerInfoList = sellerRepository.querySellerInfoByPage(request.getSellerType()); //发送到MQ //这些json数组对应的一条String消息会被defaultProducer.sendMessages()方法批量发送 //TODO 将sellerInfoList拆分成每10个为一批次进行分批发送 defaultProducer.sendMessages(PREHEAT_SELLER_CACHE_BUCKET_TOPIC, sellerInfoList, "卖家缓存预热消息"); redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.SUCCESS.getCode().toString(), -1); return true; } catch (Exception e) { redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.FAIL.getCode().toString(), -1); throw new BaseBizException("缓存预热失败,error:{}", JSON.toJSONString(e)); } finally { //释放锁 redisLock.unlock(SELLER_INFO_CACHE_PREHEAT_LOCK); return false; } } ... } @Component public class DefaultProducer { private final TransactionMQProducer producer; ... //批量发送消息 public void sendMessages(String topic, List<String> messages, String type) { sendMessages(topic, messages, -1, type); } //批量发送消息 public void sendMessages(String topic, List<String> messages, Integer delayTimeLevel, String type) { List<Message> list = new ArrayList<>(); for (String message : messages) { Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8)); if (delayTimeLevel > 0) { msg.setDelayTimeLevel(delayTimeLevel); } list.add(msg); } try { SendResult send = producer.send(list); if (SendStatus.SEND_OK == send.getSendStatus()) { log.info("发送MQ消息成功, type:{}", type); } else { throw new BaseBizException(send.getSendStatus().toString()); } } catch (Exception e) { log.error("发送MQ消息失败:", e); throw new BaseBizException("消息发送失败"); } } ... } @Repository public class SellerRepository { ... //分批次查询出卖家数据 //List中的每一个元素,是每一页的卖家数据集合JSON字符串 public List<String> querySellerInfoByPage(Integer sellerType) { LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.eq(SellerInfoDO::getDelFlag, YesOrNoEnum.YES.getCode()); queryWrapper.eq(SellerInfoDO::getSellerStatus, SellerInfoStatusEnum.OPEN_STATUS.getCode()); //如果指定预热全部卖家,则查询所有卖家数据 if (sellerType.equals(SellerTypeEnum.ALL.getCode())) { return queryTotalSellerInfoByPage(); } //查询匹配类型的卖家 return queryBatchSellerInfoByPage(sellerType); } //分批次查询出所有类型的卖家信息 private List<String> queryTotalSellerInfoByPage() { List<String> selfSellerInfoList = queryBatchSellerInfoByPage(SellerTypeEnum.SELF.getCode()); List<String> popSellerInfoList = queryBatchSellerInfoByPage(SellerTypeEnum.POP.getCode()); List<String> resultList = new ArrayList<>(selfSellerInfoList); resultList.addAll(popSellerInfoList); return resultList; } //分批次查询出指定类型的卖家信息 private List<String> queryBatchSellerInfoByPage(Integer sellerType) { List<String> batchList = new ArrayList<>(); SellerInfoRequest request = new SellerInfoRequest(); request.setSellerType(sellerType); Integer pageNum = request.getPageNo(); //设置每页数据量 request.setPageSize(SkuSellerRelationConstants.QUERY_MAX_PAGE_SIZE); while (true) { //根据条件分页查询出卖家列表 Optional<List<SellerInfoResponse>> sellerInfoResponses = querySellerInfo(request); if (!sellerInfoResponses.isPresent()) { break; } List<SellerInfoResponse> sellerInfoResponseList = sellerInfoResponses.get(); if (CollectionUtils.isEmpty(sellerInfoResponseList)) { break; } String batchResult = getBatchResult(sellerInfoResponseList, pageNum); batchList.add(batchResult); //继续取下一页的用户数据 request.setPageNo(++pageNum); } return batchList; } //根据条件分页查询出卖家列表 public Optional<List<SellerInfoResponse>> querySellerInfo(SellerInfoRequest request) { LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery(); //类型 queryWrapper.eq(Objects.nonNull(request.getSellerType()), SellerInfoDO::getSellerType, request.getSellerType()); //卖家位置(层级) queryWrapper.eq(Objects.nonNull(request.getSellerPosition()), SellerInfoDO::getSellerPosition, request.getSellerPosition()); //状态 queryWrapper.eq(Objects.nonNull(request.getSellerStatus()), SellerInfoDO::getSellerStatus, request.getSellerStatus()); //卖家ID queryWrapper.eq(Objects.nonNull(request.getSellerId()), SellerInfoDO::getSellerId, request.getSellerId()); //卖家编码 queryWrapper.eq(StringUtils.isNotEmpty(request.getSellerCode()), SellerInfoDO::getSellerCode, request.getSellerCode()); //卖家名称 queryWrapper.like(StringUtils.isNotEmpty(request.getSellerName()), SellerInfoDO::getSellerName, request.getSellerName()); //父卖家ID queryWrapper.eq(Objects.nonNull(request.getParentId()), SellerInfoDO::getParentId, request.getParentId()); //卖家ID集合 queryWrapper.in(CollectionUtils.isNotEmpty(request.getSellerIdList()), SellerInfoDO::getSellerId, request.getSellerIdList()); Page<SellerInfoDO> page = new Page<>(request.getPageNo(), request.getPageSize()); Page<SellerInfoDO> pageResult = sellerInfoMapper.selectPage(page, queryWrapper); if (Objects.isNull(pageResult)) { return Optional.of(Collections.emptyList()); } return Optional.of(sellerInfoConverter.listEntityToResponse(pageResult.getRecords())); } //将每页的卖家数据转换成JSON private String getBatchResult(List<SellerInfoResponse> sellerInfoResponseList, Integer pageNum) { List<String> batchResult = new ArrayList<>(sellerInfoResponseList.size()); for (SellerInfoResponse sellerInfoResponse : sellerInfoResponseList) { PreheatSellerMessage message = new PreheatSellerMessage(); message.setCachePageNo(pageNum); message.setSellerInfo(sellerInfoResponse); batchResult.add(JsonUtil.object2Json(message)); } return JsonUtil.object2Json(batchResult); } ... }
(2)消费消息
通过多机器分布式消费MQ来实现分布式预热缓存。
步骤一:每个消费者每次消费⼀条消息。
步骤二:拿到消息后,解析出需要预热的缓存数据,即构造出100个元素的sellerInfoResultMap,其中key为Redis卖家信息缓存的key,value为卖家信息JSON字符串。然后通过执行mset将数据设置到Redis缓存。
步骤三:接着提取出sellerIdList集合,并根据消息中卖家信息的sellerType,构造出Redis卖家类型的⻚缓存的key,然后通过执行set设置到Redis缓存。
//缓存预热消费者 @Component public class SellerPreheatCacheListener implements MessageListenerConcurrently { @Autowired private RedisCache redisCache; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //每个消费者每次消费分批次的多页数据,也就是List<String>,每个String对应了一页的数据 //每个String包含页码和该页下的100个SellerInfo集合 try { for (MessageExt messageExt : list) { //获取到分批次的多页数据List String msg = new String(messageExt.getBody()); List messageList = JsonUtil.json2Object(msg, List.class); if (CollectionUtils.isEmpty(messageList)) { throw new BaseBizException(ProductErrorCodeEnum.PARAM_CHECK_ERROR, ProductErrorCodeEnum.PARAM_CHECK_ERROR.getErrorCode()); } Map<String, String> sellerInfoResultMap = new HashMap<>(messageList.size()); List<Long> sellerIdList = new ArrayList<>(messageList.size()); //List<String>中的每个String,都包含了100个卖家数据,这些卖家类型都是一样的 for (Object message : messageList) { PreheatSellerMessage preheatSellerMessage = JsonUtil.json2Object(message.toString(), PreheatSellerMessage.class); if (Objects.isNull(preheatSellerMessage)) { throw new BaseBizException(ProductErrorCodeEnum.PARAM_CHECK_ERROR, ProductErrorCodeEnum.PARAM_CHECK_ERROR.getErrorCode()); } //卖家信息 SellerInfoResponse sellerInfo = preheatSellerMessage.getSellerInfo(); //卖家信息的Key String sellerInfoKey = SellerRedisKeyConstants.SELLER_INFO_LIST + sellerInfo.getSellerId(); //卖家信息集合 sellerInfoResultMap.put(sellerInfoKey, JsonUtil.object2Json(sellerInfo)); //卖家ID集合 sellerIdList.add(sellerInfo.getSellerId()); } PreheatSellerMessage preheatSellerMessage = JsonUtil.json2Object(messageList.get(0).toString(), PreheatSellerMessage.class); if (Objects.isNull(preheatSellerMessage)) { throw new BaseBizException(ProductErrorCodeEnum.PARAM_CHECK_ERROR, ProductErrorCodeEnum.PARAM_CHECK_ERROR.getErrorCode()); } //卖家类型 Integer sellerType = preheatSellerMessage.getSellerInfo().getSellerType(); //卖家类型的key String sellerTypeKey = ""; if (sellerType.equals(SellerTypeEnum.SELF.getCode())) { sellerTypeKey = SellerRedisKeyConstants.SELF_TYPE_LIST + preheatSellerMessage.getCachePageNo(); } else { sellerTypeKey = SellerRedisKeyConstants.POP_TYPE_LIST + preheatSellerMessage.getCachePageNo(); } //将卖家信息set到缓存 redisCache.mset(sellerInfoResultMap); //将卖家类型ID集合set到缓存 redisCache.set(sellerTypeKey, JsonUtil.object2Json(sellerIdList), -1); } } catch (Exception e) { log.error("consume error, 缓存预热消息消费失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
7.商品卖家系统定时查询DB最新数据更新缓存
//卖家缓存数据同步 @Component public class SellerCacheSyncSchedule { @Autowired private RedisCache redisCache; @Autowired private SellerRepository sellerRepository; //卖家缓存数据同步定时任务 //5分钟执行一次,每次查询出数据库最近5分钟的更新和新增数据,然后同步差异数据到缓存 @XxlJob("cacheSync") public void cacheSync() { //计算出前五分钟的时间 Date beforeTIme = getBeforeTime(); //从数据库中查询出最近5分钟内的数据 List<SellerInfoDO> sellerInfoList = sellerRepository.querySellerInfoFiveMinute(beforeTIme); //过滤出数据库和缓存中的差异数据,并将差异数据保存到缓存 saveDiffSellerInfo(sellerInfoList); } //比较数据库和缓存中的差异数据,过滤出差异数据,并将差异数据保存到缓存 private void saveDiffSellerInfo(List<SellerInfoDO> sellerInfoList) { sellerInfoList.stream().peek(sellerInfo -> { Long sellerId = sellerInfo.getSellerId(); String redisKey = SellerRedisKeyConstants.SELLER_INFO_LIST + sellerId; //不存在说明是差异数据 if (!redisCache.hasKey(redisKey)) { redisCache.set(redisKey, JsonUtil.object2Json(sellerInfo), -1); } //TODO 存在比较差异 }).count(); } //获取5分钟之前的时间 private Date getBeforeTime() { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, -5); return calendar.getTime(); } } @Repository public class SellerRepository { ... //查询出前5分钟之内的数据 public List<SellerInfoDO> querySellerInfoFiveMinute(Date beforeTIme) { List<SellerInfoDO> batchList = new ArrayList<>(); int pageNum = 1; //设置每页数据量 int pageSize = ProductConstants.QUERY_ITEM_MAX_COUNT; while (true) { LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.ge(SellerInfoDO::getCreateTime, beforeTIme); queryWrapper.ge(SellerInfoDO::getUpdateTime, beforeTIme); Page<SellerInfoDO> page = new Page<>(pageNum, pageSize); Page<SellerInfoDO> pageResult = sellerInfoMapper.selectPage(page, queryWrapper); if (Objects.isNull(pageResult) || pageResult.getRecords().size() <= 0) { break; } batchList.addAll(pageResult.getRecords()); //进入下一页 pageNum++; } return batchList; } ... }
8.商品中心高并发架构总结
一.高并发C端系统(限流 + 过滤 + 实时缓存同步)
二.高并发卖家系统(缓存预热 + 定时缓存同步)
三.高并发库存系统(缓存分桶)
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等