商品中心—13.商品卖家系统的高并发文档

大纲

1.阿里云Tair接入与Jedis连接池使用

2.Redis集群连接池与通用对象连接池源码

3.商品卖家系统高并发场景与缓存预热逻辑

4.商品卖家系统缓存预热架构设计

5.商品卖家系统单机下的缓存预热的实现

6.商品卖家系统分布式下的缓存预热的实现

7.商品卖家系统定时查询DB最新数据更新缓存

8.商品中心高并发架构总结

1.阿里云Tair接入与Jedis连接池使用

(1)引入依赖

(2)添加配置文件

(3)配置Tair相关Bean

(4)使用Tair相关命令

(1)引入依赖

Tair是多线程的,Redis是单线程的。TairJedis是阿⾥云基于Jedis开发的Redis企业版专⽤客户端。TairJedis除了Jedis原有功能,还⽀持Redis企业版包含的命令。

(1)引入依赖

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>
<dependency>
    <groupId>com.aliyun.tair</groupId>
    <artifactId>alibabacloud-tairjedis-sdk</artifactId>
    <version>2.1.0</version>
</dependency>

(2)添加配置文件

spring:
    tair:
        host: r-xxxxxxxxxxxxxxx.redis.rds.aliyuncs.com
        port: 6379
        password: password
        timeout: 3000
        maxIdle: 10000
        maxTotal: 10000

(3)配置Tair相关Bean

@Data
@Configuration
public class TairConfig {
    @Value("${spring.tair.maxIdle:200}")
    private int maxIdle;
    @Value("${spring.tair.maxTotal:300}")
    private int maxTotal;
    @Value("${spring.tair.host}")
    private String host;
    @Value("${spring.tair.port:6379}")
    private int port;
    @Value("${spring.tair.password}")
    private String password;
    @Value("${spring.tair.timeout:3000}")
    private int timeout;

    @Bean
    @ConditionalOnClass(JedisPool.class)
    public JedisPool jedisPool() {
        JedisPoolConfig config = new JedisPoolConfig();
        //最大空闲连接数,需自行评估,不超过Redis实例的最大连接数
        config.setMaxIdle(maxIdle);
        //最大连接数,需自行评估,不超过Redis实例的最大连接数
        config.setMaxTotal(maxTotal);
        config.setTestOnBorrow(false);  
        config.setTestOnReturn(false);
        return new JedisPool(config, host, port, timeout, password);
    }

    @Bean
    @ConditionalOnClass(TairCache.class)
    public TairCache tairCache(JedisPool jedisPool) {
        return new TairCache(jedisPool);
    }
}

(4)使用Tair相关命令

@Component
public class TairCache {
    private JedisPool jedisPool;

    public TairCache(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    public Jedis getJedis() {
        return jedisPool.getResource();
    }

    public TairString createTairString(Jedis jedis) {
        return new TairString(jedis);
    }

    private TairHash createTairHash(Jedis jedis) {
        return new TairHash(jedis);
    }

    //缓存存储
    public void set(String key, String value, int seconds) {
        log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds);
        try (Jedis jedis = getJedis()) {
            TairString tairString = createTairString(jedis);
            String result;
            if (seconds > 0) {
                result = tairString.exset(key, value, new ExsetParams().ex(seconds));
            } else {
                result = tairString.exset(key, value);
            }
            log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds);
        }
    }

    //tairString存储
    public boolean exset(String key, String value, int seconds) {
        try (Jedis jedis = getJedis()) {
            TairString tairString = createTairString(jedis);
            String result;
            if (seconds > 0) {
                result = tairString.exset(key, value, new ExsetParams().ex(seconds));
            } else {
                result = tairString.exset(key, value);
            }
            return "OK".equals(result);
        }
    }

    //缓存获取
    public String get(String key) {
        try (Jedis jedis = getJedis()) {
            ExgetResult<String> exget = createTairString(jedis).exget(key);
            if (exget == null) {
                return null;
            }
            return exget.getValue();
        } catch (Exception e) {
            log.error("tairString get error,key{}", key, e);
        }
        return null;
    }

    //缓存自增
    public Integer incr(String key) {
        return this.exincrby(key, 1);
    }

    //缓存自增
    public Integer incr(String key, Integer incrNum) {
        return this.exincrby(key, incrNum);
    }

    //缓存自减
    public Integer decr(String key, Integer decrNum) {
        return this.exincrby(key, -Math.abs(decrNum));
    }

    //删除缓存
    public Integer delete(String key) {
        try (Jedis jedis = getJedis()) {
            return jedis.del(key).intValue();
        }
    }

    //删除缓存
    public Integer mdelete(List<String> keyList) {
        try (Jedis jedis = getJedis()) {
            return jedis.del(keyList.toArray(new String[keyList.size()])).intValue();
        }
    }

    //缓存批量获取
    public List mget(List<String> keyList) {
        try (Jedis jedis = getJedis()) {
            TairString tairString = createTairString(jedis);
            return keyList.stream().map(key -> {
                ExgetResult<String> exget = tairString.exget(key);
                if (exget != null) {
                    return exget.getValue();
                }
                return null;
            }).collect(Collectors.toList());
        }
    }

    //对TairString的value进行自增自减操作
    public Integer exincrby(String key, Integer incrNum) {
        try (Jedis jedis = getJedis()) {
            int value = createTairString(jedis).exincrBy(key, incrNum, ExincrbyParams.ExincrbyParams().min(0)).intValue();
            return value;
        } catch (Exception e) {
            //出现自增或者自减数据溢出等异常,直接返回操作失败
            return -1;
        }
    }

    //存储hash对象
    public Integer exhset(String key, String value) {
        try (Jedis jedis = getJedis()) {
            Map<String, Object> innerMap = JSON.parseObject(value).getInnerMap();
            Map<String, String> map = Maps.transformEntries(innerMap, (k, v) -> String.valueOf(v));
            String exhmset = createTairHash(jedis).exhmset(key, map);
            //成功返回 OK
            return 0;
        }
    }

    //存储hash对象
    public Integer exhset(String key, String field, String value) {
        try (Jedis jedis = getJedis()) {
            return createTairHash(jedis).exhset(key, field, value, ExhsetParams.ExhsetParams().nx()).intValue();
        }
    }

    //获取hash对象
    public String exhget(String key, String field) {
        try (Jedis jedis = getJedis()) {
            String exhget = createTairHash(jedis).exhget(key, field);
            log.info("exhget key:{}, field:{}, value:{}", key, field, exhget);
            return exhget;
        }
    }
}

2.Redis集群连接池与通用对象连接池源码

(1)Spring的RedisConnectionFactory和实现JedisConnectionFactory

(2)Jedis的Pool对象是基于Apache Commons的通用对象池的

(3)Apache Commons的通用对象池的borrowObject()方法

(4)ShardedJedisFactory中创建Jedis连接对象的具体方法

(5)Redis分片对象ShardedJedis的创建

(6)ShardedJedisPool的初始化

这里的Redis集群在客户端使用一致性Hash进行分片,与Redis Cluster这种服务端集群是不同的。

(1)Spring的RedisConnectionFactory和实现JedisConnectionFactory

@Data
@Configuration
@ConditionalOnClass(RedisConnectionFactory.class)
public class RedisConfig {
    ...
    @Bean
    @ConditionalOnClass(RedisConnectionFactory.class)
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setDefaultSerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
    ...
}

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
    //Provides a suitable connection for interacting with Redis.
    RedisConnection getConnection();

    //Provides a suitable connection for interacting with Redis Cluster.
    RedisClusterConnection getClusterConnection();

    //Provides a suitable connection for interacting with Redis Sentinel.
    RedisSentinelConnection getSentinelConnection();
}

public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
    private @Nullable Pool<Jedis> pool;
    ...

    public RedisConnection getConnection() {
        if (isRedisClusterAware()) {
            return getClusterConnection();
        }

        Jedis jedis = fetchJedisConnector();
        JedisConnection connection = (getUsePool() ? 
            new JedisConnection(jedis, pool, getDatabase(), getClientName())
            : new JedisConnection(jedis, null, getDatabase(), getClientName()));
        connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
        return postProcessConnection(connection);
    }

    protected Jedis fetchJedisConnector() {
        try {
            if (getUsePool() && pool != null) {
                return pool.getResource();
            }
            Jedis jedis = createJedis();
            jedis.connect();
            potentiallySetClientName(jedis);
            return jedis;
        } catch (Exception ex) {
            throw new RedisConnectionFailureException("Cannot get Jedis connection", ex);
        }
    }
    ...
}

(2)Jedis的Pool对象是基于Apache Commons的通用对象池的

public abstract class Pool<T> implements Closeable {
    //internalPool就是Apache Commons的通用对象池
    protected GenericObjectPool<T> internalPool;

    public T getResource() {
        try {
            return internalPool.borrowObject();
        } catch (NoSuchElementException nse) {
            ...
        }
    }
}

//假设使用ShardedJedisPool去连接多个Redis节点组成的集群,去通过getResource()方法获取连接对象
public class ShardedJedisPool extends Pool<ShardedJedis> {
    ...
    @Override
    public ShardedJedis getResource() {
        ShardedJedis jedis = super.getResource();
        jedis.setDataSource(this);
        return jedis;
    }

    //激活方法为空
    @Override
    public void activateObject(PooledObject<ShardedJedis> p) throws Exception {
    
    }
    ...
}

(3)Apache Commons的通用对象池的borrowObject()方法

public class GenericObjectPool<T> extends BaseGenericObjectPool<T> implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> {
    //存放空闲对象的队列
    private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
    ...

    @Override
    public T borrowObject() throws Exception {
        return borrowObject(getMaxWaitMillis());
    }

    public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
        assertOpen();
        final AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) && (getNumActive() > getMaxTotal() - 3) ) {
            removeAbandoned(ac);
        }
        PooledObject<T> p = null;

        //如果连接耗尽是否需要阻塞
        final boolean blockWhenExhausted = getBlockWhenExhausted();

        boolean create;
        final long waitTime = System.currentTimeMillis();

        while (p == null) {
            create = false;
            //从队列中获取对象
            p = idleObjects.pollFirst();
            if (p == null) {
                //创建对象
                p = create();
                if (p != null) {
                    create = true;
                }
            }
            ...
        }
        updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
        return p.getObject();
    }

    //创建对象
    private PooledObject<T> create() throws Exception {
        ...
        final PooledObject<T> p;
        try {
            //调用Factory的makeObject()方法创建对象
            p = factory.makeObject();
            if (getTestOnCreate() && !factory.validateObject(p)) {
                createCount.decrementAndGet();
                return null;
            }
        } catch (final Throwable e) {
            createCount.decrementAndGet();
            throw e;
        } finally {
            synchronized (makeObjectCountLock) {
                makeObjectCount--;
                makeObjectCountLock.notifyAll();
            }
        }
        ...
        return p;
    }

    @Override
    public void addObject() throws Exception {
        assertOpen();
        if (factory == null) {
            throw new IllegalStateException("Cannot add objects without a factory.");
        }
        final PooledObject<T> p = create();
        addIdleObject(p);
    }

    private void addIdleObject(final PooledObject<T> p) throws Exception {
        if (p != null) {
            factory.passivateObject(p);
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
        }
    }
    ...
}

(4)ShardedJedisFactory中创建Jedis连接对象的具体方法

一个连接会封装成一个DefaultPooledObject对象。

public class ShardedJedisPool extends Pool<ShardedJedis> {
    ...
    private static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {
        private List<JedisShardInfo> shards;
        ...

        //创建Jedis连接对象的具体方法
        @Override
        public PooledObject<ShardedJedis> makeObject() throws Exception {
            //创建ShardedJedis对象
            ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
            return new DefaultPooledObject<ShardedJedis>(jedis);
        }
        ...
    }
    ...
}

(5)Redis分片对象ShardedJedis的创建

一个ShardedJedis分片可以理解成对应一个集群,一个ShardedJedis中会有各个Redis节点的连接。即一个ShardedJedis中,会连接Redis的各个节点,每个ShardedJedis分片都有多个虚拟节点。

public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable {
    ...
    public ShardedJedis(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
        super(shards, algo, keyTagPattern);
    }
    ...
}

public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo> implements BinaryJedisCommands {
    ...
    public BinaryShardedJedis(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
        super(shards, algo, keyTagPattern);
    }
    ...
}

public class Sharded<R, S extends ShardInfo<R>> {
    public static final int DEFAULT_WEIGHT = 1;
    private TreeMap<Long, S> nodes;
    private final Hashing algo;
    private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>();
    ...

    public Sharded(List<S> shards, Hashing algo, Pattern tagPattern) {
        this.algo = algo;
        this.tagPattern = tagPattern;
        initialize(shards);
    }

    private void initialize(List<S> shards) {
        nodes = new TreeMap<Long, S>();
        //每个分片都有多个虚拟节点,存放在nodes中
        for (int i = 0; i != shards.size(); ++i) {
            final S shardInfo = shards.get(i);
            int N =  160 * shardInfo.getWeight();
            if (shardInfo.getName() == null) {
                for (int n = 0; n < N; n++) {
                    nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
                }
            } else {
                for (int n = 0; n < N; n++) {
                    nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo);
                }
            }
            resources.put(shardInfo, shardInfo.createResource());
        }
    }
    ...
}

public class JedisShardInfo extends ShardInfo<Jedis> {
    ...
    @Override
    public Jedis createResource() {
        //封装一个Jedis对象
        return new Jedis(this);
    }
    ...
}

(6)ShardedJedisPool的初始化

public class ShardedJedisPool extends Pool<ShardedJedis> {
    ...
    public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
        super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));
    }
    ...
}

public abstract class Pool<T> implements Closeable {
    protected GenericObjectPool<T> internalPool;
    ...

    public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
        initPool(poolConfig, factory);
    }

    public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
        if (this.internalPool != null) {
            try {
                closeInternalPool();
            } catch (Exception e) {
            }
        }
        this.internalPool = new GenericObjectPool<>(factory, poolConfig);
    }
    ...
}

public class GenericObjectPool<T> extends BaseGenericObjectPool<T> implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> {
    //存放空闲对象的队列
    private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
    ...

    public GenericObjectPool(final PooledObjectFactory<T> factory, final GenericObjectPoolConfig<T> config) {
        super(config, ONAME_BASE, config.getJmxNamePrefix());
        if (factory == null) {
            jmxUnregister();//tidy up
            throw new IllegalArgumentException("factory may not be null");
        }
        this.factory = factory;
        //空闲对象
        idleObjects = new LinkedBlockingDeque<>(config.getFairness());
        setConfig(config);
    }

    public void setConfig(final GenericObjectPoolConfig<T> conf) {
        super.setConfig(conf);
        setMaxIdle(conf.getMaxIdle());//设置最大空闲数
        setMinIdle(conf.getMinIdle());//设置最小空闲数
        setMaxTotal(conf.getMaxTotal());
    }
    ...
}

(7)ShardedJedisPool的一致性Hash算法

下面使用ShardedJedis的set()方法为例进行说明,这里的Redis集群在客户端使用一致性Hash进行分片,与Redis Cluster这种服务端集群是不同的。

//假设使用ShardedJedisPool去连接多个Redis节点组成的集群,去通过getResource()方法获取连接对象
public class ShardedJedisPool extends Pool<ShardedJedis> {
    ...
    @Override
    public ShardedJedis getResource() {
        ShardedJedis jedis = super.getResource();
        jedis.setDataSource(this);
        return jedis;
    }
    ...
}

public class ShardedJedisPool extends Pool<ShardedJedis> {
    ...
    @Override
    public ShardedJedis getResource() {
        ShardedJedis jedis = super.getResource();
        jedis.setDataSource(this);
        return jedis;
    }
    ...
}

public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable {
    ...
    protected ShardedJedisPool dataSource = null;

    @Override
    public String set(final String key, final String value) {
        Jedis j = getShard(key);//根据key获取分片
        return j.set(key, value);
    }
    ...
}

public class Sharded<R, S extends ShardInfo<R>> {
    private TreeMap<Long, S> nodes;
    private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>();
    ...

    //根据key获取连接对象
    public R getShard(String key) {
        return resources.get(getShardInfo(key));
    }

    public S getShardInfo(String key) {
        return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
    }

    public S getShardInfo(byte[] key) {
        SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
        if (tail.isEmpty()) {
            return nodes.get(nodes.firstKey());
        }
        return tail.get(tail.firstKey());
    }
    ...
}

3.商品卖家系统高并发场景与缓存预热逻辑

(1)缓存预热的必要性

(2)缓存预热方式之单机和分布式的区别

(3)缓存预热的逻辑

(1)缓存预热的必要性

查询卖家信息的接口会被C端⽤户频繁请求。因为C端⽤户在使⽤商品系统时,会产⽣⼤量的卖家数据请求。如果这些请求直接查询卖家数据库,则会对卖家数据库产⽣⾮常⼤的压⼒。所以查询卖家信息的接口,需要针对卖家数据进行缓存。同时由于请求量⽐较⼤,所以不能通过⽤户发起请求来进行惰性缓存,因此需要进行卖家数据的缓存预热。

因为卖家数据是⼀组相对固定、变化⼩的数据,所以在对外提供服务前,可先批量从DB获取数据,再设置到Redis缓存中。

查询卖家信息的接口如下:

@DubboService(version = "1.0.0", interfaceClass = SellerAbilityApi.class, retries = 0)
public class SellerAbilityApiImpl implements SellerAbilityApi {
    @Autowired
    private SellerInfoService sellerInfoService;
    ...

    //提供给商品C端调用,根据卖家ID和卖家类型获取卖家信息
    @Override
    public JsonResult<List<SellerInfoResponse>> getSellerInfo(SellerInfoRequest sellerInfoRequest) {
        try {
            List<SellerInfoResponse> sellerInfoResponseList = sellerInfoService.querySellerInfoForRPC(sellerInfoRequest);
            return JsonResult.buildSuccess(sellerInfoResponseList);
        } catch (ProductBizException e) {
            log.error("biz error: request={}", JSON.toJSONString(sellerInfoRequest), e);
            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
        } catch (Exception e) {
            log.error("system error: request={}", JSON.toJSONString(sellerInfoRequest), e);
            return JsonResult.buildError(e.getMessage());
        }
    }
    ...
}

@Service
public class SellerInfoServiceImpl implements SellerInfoService {
    @Autowired
    private SellerInfoCache sellerInfoCache;
    ...

    //不同系统间调用RPC接口,查询卖家
    //卖家系统提供给外部系统调用的卖家查询接口,只允许通过sellerIdList或者sellerType两个参数进行查询
    @Override
    public List<SellerInfoResponse> querySellerInfoForRPC(SellerInfoRequest request) {
        //参数校验,RPC接口根据sellerIdList和sellerType查询
        checkQuerySellerInfoRequestByRPC(request);
        //如果未传入sellerIdList
        if (CollectionUtils.isEmpty(request.getSellerIdList())) {
            //根据sellerType获取该类型下的sellerId集合
            Optional<List<Long>> sellerIdListOps = getSellerIdListBySellerType(request);
            //如果类型下没有sellerId数据,直接返回空卖家集合
            if (!sellerIdListOps.isPresent()) {
                return Collections.emptyList();
            }
            //将根据sellerType查询到的sellerIdList数据set到request中
            request.setSellerIdList(sellerIdListOps.get());
        }

        //根据sellerIdList从缓存或者数据库中查询卖家信息
        Optional<List<SellerInfoResponse>> sellerInfoListOps = 
            sellerInfoCache.listRedisStringDataByCache(
                request.getSellerIdList(),
                SellerInfoResponse.class,
                sellerId -> SellerRedisKeyConstants.SELLER_INFO_LIST + sellerId,
                //根据sellerId从DB中获取SellerInfo数据
                sellerId -> sellerRepository.querySellerInfoBySellerId(sellerId)
            );
        if (!sellerInfoListOps.isPresent()) {
            return Collections.emptyList();
        }
        //过滤类型不一致的卖家信息
        return filterAccordantSellerInfo(sellerInfoListOps.get(), request);
    }
    ...
}

(2)缓存预热方式之单机和分布式的区别

卖家系统提供了两个接口可手动触发单机的缓存预热 + 分布式的缓存预热。无论是单机还是分布式的缓存预热,都会查出全量的卖家数据来进行预热。

如果已预热过,可通过设置force=true来强制再进行全量卖家数据的预热。由于预热完成后,卖家系统可能还会对卖家数据进行变更,所以每隔5分钟执行一个定时任务查询DB里的数据,diff缓存数据并刷新。

第一次预热可使用单机的缓存预热接口、也可使用分布式的缓存预热接口。

一.单机的意思,就是由卖家系统的一台机器进行全量卖家数据的预热操作。

二.分布式的意思,就是收到分布式预热请求的一台机器,会先去DB查询出一批批的卖家数据,然后发送到MQ。接着由卖家系统的多台机器去消费这些卖家数据,从而实现分布式预热。

(3)缓存预热的逻辑

预热逻辑,在业务上并不复杂。主要是在尚未对外提供服务前,把所有的卖家数据,设置到Redis缓存。在对外提供服务后,能做到绝大部分的卖家信息请求,都能直接通过缓存来提供数据服务,避免对MySQL造成⽐较⼤的压⼒。

4.商品卖家系统缓存预热架构设计

(1)Redis缓存结构设计

(2)单机的缓存预热方案设计

(3)分布式的缓存预热⽅案设计

(1)Redis缓存结构设计

(2)单机的缓存预热方案设计

步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。

步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新。如果为0,则代表不强制刷新缓存。

步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。

步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据,否则代表预热的是sellerType卖家。根据sellerType,一页一页去DB查询出卖家数据,直到查出的数据为空。

步骤五:将每一页查出的100条卖家数据,构建成写入缓存的数据Map。接着将数据Map通过mset写入缓存,以任务的形式,提交给线程池处理。其中的线程池会通过Semaphore进行阻塞最多提交20个写入缓存的任务。

步骤六:数据处理完后,在Redis中设置卖家信息的缓存预热状态为成功。

步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。

步骤八:最后在finally块中释放分布式锁。

(3)分布式的缓存预热⽅案设计

一.⽣产者

步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。

步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新。如果为0,则代表不强制刷新缓存。

步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。

步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据;否则代表预热的是sellerType卖家。然后通过分批次,查询出满⾜sellerType类型的所有卖家数据。每批查100个卖家,会将这100个卖家数据转换为JSON字符串。接着将JSON字符串添加到集合中,将最终的查询结果组装为List。

步骤五:将查询出的卖家数据集合List批量或分批次发送给MQ,对应的MQ的topic为:preheat_seller_cache_bucket_topic。

步骤六:消息发送成功后,在Redis中设置卖家信息的缓存预热状态为成功。

步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。

步骤八:最后在finally块中释放分布式锁。

二.消费者

步骤一:每个消费者每次消费⼀条消息。

步骤二:拿到消息后,解析出需要预热的缓存数据,即构造出100个元素的sellerInfoResultMap。其中key为Redis卖家信息缓存的key,value为卖家信息JSON字符串,然后通过执行mset将数据设置到Redis缓存。

步骤三:接着提取出sellerIdList集合,并根据消息中卖家信息的sellerType,构造出Redis卖家类型的⻚缓存的key,然后通过set命令设置到Redis。

5.商品卖家系统单机下的缓存预热的实现

(1)实现细节

(2)实现代码

(1)实现细节

一.一页一页地查询数据,然后构建写入缓存的数据的Map,最后生成任务提交给线程池进行处理。

二.如果在ThreadPoolExecutor中设置一个无限队列的Queue,那么可能会让该Queue不断增长从而撑满内存。如果提交的任务不需排队,而是提交一个任务就创建一个线程来执行,那么又可能会耗尽线程。

所以为了线程池安全,可以使用Semaphore信号量进行阻塞。如果超过了20个任务同时要运行,会通过Semaphore信号量阻塞提交任务,从而实现安全的线程池。

(2)实现代码

步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。

步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新。如果为0,则代表不强制刷新缓存。

步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。

步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据;否则代表预热的是sellerType卖家。根据sellerType,一页一页去DB查询出卖家数据,直到查出的数据为空。

步骤五:将每一页查出的100条卖家数据,构建成写入缓存的数据Map。接着将数据Map通过mset写入缓存,以任务的形式,提交给线程池处理。其中的线程池会通过Semaphore进行阻塞最多提交20个写入缓存的任务。

步骤六:数据处理完后,在Redis中设置卖家信息的缓存预热状态为成功。

步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。

步骤八:最后在finally块中释放分布式锁。

@RestController
@RequestMapping("/sellerCache")
public class SellerCacheController {
    @Autowired
    private SellerInfoCache sellerInfoCache;

    @PostMapping("/preheat")
    public JsonResult<Boolean> preheatSellerCache(@RequestBody SellerInfoCacheRequest request) {
        try {
            Boolean result = sellerInfoCache.preheatSellerCache(request);
            return JsonResult.buildSuccess(result);
        } catch (BaseBizException baseBizException) {
            log.error("biz error: request={}", JSON.toJSONString(request), baseBizException);
            return JsonResult.buildError(baseBizException.getMessage());
        } catch (Exception e) {
            log.error("system error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getMessage());
        }
    }
    ...
}

@Service("sellerInfoCache")
public class SellerInfoCache {
    @Resource
    private SellerRepository sellerRepository;

    @Resource
    private RedisCache redisCache;

    @Resource
    private RedisLock redisLock;

    //缓存预热的线程池
    @Autowired
    @Qualifier("preheatCacheThreadPool")
    private PreheatCacheThreadPool preheatCacheThreadPool;
    ...

    //根据卖家类型预热卖家缓存
    public Boolean preheatSellerCache(SellerInfoCacheRequest cacheRequest) {
        try {
            //1.先获取一把分布式锁
            redisLock.lock(SELLER_INFO_CACHE_PREHEAT_LOCK);

            //2.判断是否需要强制刷新缓存,如果不需要强制刷新,则判断是否预热过
            //如果需要强制刷新缓存,则不用判断是否预热过
            if (!NumberUtils.INTEGER_ONE.equals(cacheRequest.getForceFlush())) {
                //如果已经预热过,则直接返回true
                if (isPreheated()) {
                    return true;
                }
            }
            //3.根据卖家类型刷新缓存,如果没有选择类型,则默认两种都进行预热
            if (NumberUtils.INTEGER_ZERO.equals(cacheRequest.getSellerType())) {
                //预热卖家信息
                preheatSellerInfoToCache(SellerTypeEnum.SELF.getCode());
                //预热卖家id列表信息
                preheatSellerIdCache(SellerTypeEnum.SELF.getCode());
                preheatSellerInfoToCache(SellerTypeEnum.POP.getCode());
                preheatSellerIdCache(SellerTypeEnum.POP.getCode());
            } else {
                //4.如果选择了类型,就按照类型来预热
                preheatSellerInfoToCache(cacheRequest.getSellerType());
                preheatSellerIdCache(cacheRequest.getSellerType());
            }
            redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.SUCCESS.getCode().toString(), -1);
            return true;
        } catch (Exception e) {
            redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.FAIL.getCode().toString(), -1);
            throw new BaseBizException("缓存预热失败,error:{}", JSON.toJSONString(e));
        } finally {
            //释放锁
            redisLock.unlock(SELLER_INFO_CACHE_PREHEAT_LOCK);
        }
    }

    //判断缓存是否预热成功
    private Boolean isPreheated() {
        //如果预热key不存在,则说明没有预热过
        Boolean isExist = redisCache.hasKey(SELLER_INFO_CACHE_PREHEAT_SUCCESS);
        if (!isExist) {
            return false;
        }
        //如果预热key存在,且值为1,则代表预热成功
        Integer success = Integer.parseInt(redisCache.get(SELLER_INFO_CACHE_PREHEAT_SUCCESS));
        if (!SellerCacheStatusEnum.SUCCESS.getCode().equals(success)) {
            return false;
        }
        return true;
    }

    //根据用户类型预热缓存
    public Boolean preheatSellerInfoToCache(Integer type) {
        SellerInfoRequest request = new SellerInfoRequest();
        request.setSellerType(type);
        Integer pageNum = request.getPageNo();
        //设置每页数据量
        request.setPageSize(CollectionSize.DEFAULT);

        //一页一页地去查询数据并交给线程池进行处理
        while (true) {
            Optional<List<SellerInfoResponse>> sellerInfoResponses = sellerRepository.querySellerInfo(request);
            if (!sellerInfoResponses.isPresent()) {
                break;
            }
            List<SellerInfoResponse> sellerInfoResponseList = sellerInfoResponses.get();
            //查到的数据为空时跳出循环
            if (CollectionUtils.isEmpty(sellerInfoResponseList)) {
                break;
            }

            Map<String, String> result = new HashMap<>(sellerInfoResponseList.size());
            //构建写入缓存的数据的map
            for (SellerInfoResponse sellerInfoResponse : sellerInfoResponseList) {
                Long sellerId = sellerInfoResponse.getSellerId();
                result.put(SellerRedisKeyConstants.SELLER_INFO_LIST + sellerId, JSON.toJSONString(sellerInfoConverter.responseToDTO(sellerInfoResponse)));
            }
            log.info("本批次缓存map:{}", JSON.toJSONString(result));
            //把本批次的缓存预热任务提交给线程池处理
            preheatCacheThreadPool.execute(() -> redisCache.mset(result));

            //继续取下一页的用户数据
            request.setPageNo(++pageNum);
            log.info("第" + pageNum + "页数据预热完成");
        }
        return true;
    }

    //根据类型预热用户id缓存
    Boolean preheatSellerIdCache(Integer type) {
        //根据类型全量查卖家ID
        SellerInfoRequest request = new SellerInfoRequest();
        Integer pageNo = request.getPageNo();
        request.setSellerType(type);
        request.setPageSize(CollectionSize.DEFAULT);
        String key = SellerRedisKeyConstants.SELF_TYPE_LIST;
        if (SellerTypeEnum.POP.getCode().equals(type)) {
            key = SellerRedisKeyConstants.POP_TYPE_LIST;
        }
        while (true) {
            Optional<List<Long>> optionalList = sellerRepository.pageSellerIdListByType(request);
            if (!optionalList.isPresent()) {
                break;
            }
            List<Long> sellerIdList = optionalList.get();
            //查到的数据为空时跳出循环
            if (CollectionUtils.isEmpty(sellerIdList)) {
                break;
            }
            log.info("本批次缓存list:{}", JSON.toJSONString(sellerIdList));
            //存Redis
            String finalKey = key + pageNo;
            preheatCacheThreadPool.execute(() -> redisCache.set(finalKey, JSON.toJSONString(sellerIdList), -1));
            //继续取下一页的用户数据
            request.setPageNo(++pageNo);
        }
        return true;
    }
    ...
}

@Repository
public class SellerRepository {
    ...
    //根据条件分页查询出卖家列表
    public Optional<List<SellerInfoResponse>> querySellerInfo(SellerInfoRequest request) {
        LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery();
        //类型
        queryWrapper.eq(Objects.nonNull(request.getSellerType()), SellerInfoDO::getSellerType, request.getSellerType());
        //卖家位置(层级)
        queryWrapper.eq(Objects.nonNull(request.getSellerPosition()), SellerInfoDO::getSellerPosition, request.getSellerPosition());
        //状态
        queryWrapper.eq(Objects.nonNull(request.getSellerStatus()), SellerInfoDO::getSellerStatus, request.getSellerStatus());
        //卖家ID
        queryWrapper.eq(Objects.nonNull(request.getSellerId()), SellerInfoDO::getSellerId, request.getSellerId());
        //卖家编码
        queryWrapper.eq(StringUtils.isNotEmpty(request.getSellerCode()), SellerInfoDO::getSellerCode, request.getSellerCode());
        //卖家名称
        queryWrapper.like(StringUtils.isNotEmpty(request.getSellerName()), SellerInfoDO::getSellerName, request.getSellerName());
        //父卖家ID
        queryWrapper.eq(Objects.nonNull(request.getParentId()), SellerInfoDO::getParentId, request.getParentId());
        //卖家ID集合
        queryWrapper.in(CollectionUtils.isNotEmpty(request.getSellerIdList()), SellerInfoDO::getSellerId, request.getSellerIdList());

        Page<SellerInfoDO> page = new Page<>(request.getPageNo(), request.getPageSize());
        Page<SellerInfoDO> pageResult = sellerInfoMapper.selectPage(page, queryWrapper);
        if (Objects.isNull(pageResult)) {
            return Optional.of(Collections.emptyList());
        }
        return Optional.of(sellerInfoConverter.listEntityToResponse(pageResult.getRecords()));
    }
    ...
}

public class PreheatCacheThreadPool {
    private final Semaphore semaphore;
    private final ThreadPoolExecutor threadPoolExecutor;

    public PreheatCacheThreadPool(int permits) {
        //如果超过了20个任务同时要运行,会通过Semaphore信号量阻塞,从而实现安全的线程池
        semaphore = new Semaphore(permits);

        //这里设置核心线程数为0,是因为缓存预热的任务是一次性执行
        //所以没有比要保留核心线程,等线程工作完毕,线程就全部退出即可

        //如果不使用Semaphore,而在ThreadPoolExecutor中设置一个无限队列的Queue,
        //那么可能会让该Queue不断增长从而撑满内存;
        //如果提交的任务不需排队,而是提交一个任务就创建一个线程来执行,
        //那么又可能会耗尽线程;
        threadPoolExecutor = new ThreadPoolExecutor(
            0,
            permits * 2,//乘以2是因为semaphore的释放要比线程的完成早一点点
            60,
            TimeUnit.SECONDS,
            new SynchronousQueue<>()
        );
    }

    public void execute(Runnable task) {
        //超过了20个同步任务就阻塞住,不让它执行太多,从而实现安全的线程池
        semaphore.acquireUninterruptibly();

        threadPoolExecutor.submit(() -> {
            try {
                task.run();
            } finally {
                //semaphore的释放要比当前线程任务的结束要早一点点
                //导致Semaphore放行很多任务进来了,但是线程池的线程还没释放
                semaphore.release();
            }
        });
    }
}

6.商品卖家系统分布式下的缓存预热的实现

(1)发送消息

(2)消费消息

(1)发送消息

步骤一:执⾏预热操作前,先添加分布式锁,保证执行的原⼦性。

步骤二:判断forceFlush是否强制刷新参数是否为1。如果为1,则代表强制刷新;如果为0,则代表不强制刷新缓存。

步骤三:forceFlush如果为0,则从Redis中获取缓存的预热状态。如果预热状态为成功,则不预热。

步骤四:通过sellerType卖家类型参数判断指定需要预热缓存的卖家类型。如果传⼊为0,代表预热所有卖家数据,否则代表预热的是sellerType卖家。然后通过分批次,查询出满⾜sellerType类型的所有卖家数据。每批查100个卖家,会将这100个卖家数据转换为JSON字符串。接着将JSON字符串添加到集合中,将最终的查询结果组装为List。

步骤五:将查询出的卖家数据集合List批量或分批次发送给MQ,对应的MQ的topic为:preheat_seller_cache_bucket_topic。

步骤六:消息发送成功后,在Redis中设置卖家信息的缓存预热状态为成功。

步骤七:如果以上步骤出现异常,则在catch中设置缓存预热状态为失败。

步骤八:最后在finally块中释放分布式锁。

@RestController
@RequestMapping("/sellerCache")
public class SellerCacheController {
    @Autowired
    private SellerInfoCache sellerInfoCache;
    
    @PostMapping("shardingPreheat")
    public JsonResult<Boolean> shardingPreheatSellerCache(@RequestBody SellerInfoCacheRequest request) {
        try {
            Boolean result = sellerInfoCache.shardingPreheatSellerCache(request);
            return JsonResult.buildSuccess(result);
        } catch (BaseBizException baseBizException) {
            log.error("biz error: request={}", JSON.toJSONString(request), baseBizException);
            return JsonResult.buildError(baseBizException.getMessage());
        } catch (Exception e) {
            log.error("system error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getMessage());
        }
    }
}

@Service("sellerInfoCache")
public class SellerInfoCache {
    @Resource
    private SellerRepository sellerRepository;

    @Resource
    private RedisCache redisCache;

    @Resource
    private RedisLock redisLock;

    @Autowired
    private DefaultProducer defaultProducer;
    ...

    //卖家缓存预热,分布式
    public Boolean shardingPreheatSellerCache(SellerInfoCacheRequest request) {
        try {
            //检查参数
            checkPreheatParam(request);
            //获取一把分布式锁
            redisLock.lock(SELLER_INFO_CACHE_PREHEAT_LOCK);
            //如果需要强制刷新缓存,则不用判断是否预热过
            if (!NumberUtils.INTEGER_ONE.equals(request.getForceFlush())) {
                //如果已经预热过,则直接返回true
                if (isPreheated()) {
                    return true;
                }
            }
            //分批次查询出卖家数据
            //sellerInfoList中的每个String都是100个卖家数据组册的一个batch对应的json数组
            List<String> sellerInfoList = sellerRepository.querySellerInfoByPage(request.getSellerType());
            //发送到MQ
            //这些json数组对应的一条String消息会被defaultProducer.sendMessages()方法批量发送
            //TODO 将sellerInfoList拆分成每10个为一批次进行分批发送
            defaultProducer.sendMessages(PREHEAT_SELLER_CACHE_BUCKET_TOPIC, sellerInfoList, "卖家缓存预热消息");
            redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.SUCCESS.getCode().toString(), -1);
            return true;
        } catch (Exception e) {
            redisCache.set(SELLER_INFO_CACHE_PREHEAT_SUCCESS, SellerCacheStatusEnum.FAIL.getCode().toString(), -1);
            throw new BaseBizException("缓存预热失败,error:{}", JSON.toJSONString(e));
        } finally {
            //释放锁
            redisLock.unlock(SELLER_INFO_CACHE_PREHEAT_LOCK);
            return false;
        }
    }
    ...
}

@Component
public class DefaultProducer {
    private final TransactionMQProducer producer;
    ...
    //批量发送消息
    public void sendMessages(String topic, List<String> messages, String type) {
        sendMessages(topic, messages, -1, type);
    }

    //批量发送消息
    public void sendMessages(String topic, List<String> messages, Integer delayTimeLevel, String type) {
        List<Message> list = new ArrayList<>();
        for (String message : messages) {
            Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));
            if (delayTimeLevel > 0) {
                msg.setDelayTimeLevel(delayTimeLevel);
            }
            list.add(msg);
        }
        try {
            SendResult send = producer.send(list);
            if (SendStatus.SEND_OK == send.getSendStatus()) {
                log.info("发送MQ消息成功, type:{}", type);
            } else {
                throw new BaseBizException(send.getSendStatus().toString());
            }
        } catch (Exception e) {
            log.error("发送MQ消息失败:", e);
            throw new BaseBizException("消息发送失败");
        }
    }
    ...
}

@Repository
public class SellerRepository {
    ...
    //分批次查询出卖家数据
    //List中的每一个元素,是每一页的卖家数据集合JSON字符串
    public List<String> querySellerInfoByPage(Integer sellerType) {
        LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery();
        queryWrapper.eq(SellerInfoDO::getDelFlag, YesOrNoEnum.YES.getCode());
        queryWrapper.eq(SellerInfoDO::getSellerStatus, SellerInfoStatusEnum.OPEN_STATUS.getCode());
        //如果指定预热全部卖家,则查询所有卖家数据
        if (sellerType.equals(SellerTypeEnum.ALL.getCode())) {
            return queryTotalSellerInfoByPage();
        }
        //查询匹配类型的卖家
        return queryBatchSellerInfoByPage(sellerType);
    }

    //分批次查询出所有类型的卖家信息
    private List<String> queryTotalSellerInfoByPage() {
        List<String> selfSellerInfoList = queryBatchSellerInfoByPage(SellerTypeEnum.SELF.getCode());
        List<String> popSellerInfoList = queryBatchSellerInfoByPage(SellerTypeEnum.POP.getCode());
        List<String> resultList = new ArrayList<>(selfSellerInfoList);
        resultList.addAll(popSellerInfoList);
        return resultList;
    }

    //分批次查询出指定类型的卖家信息
    private List<String> queryBatchSellerInfoByPage(Integer sellerType) {
        List<String> batchList = new ArrayList<>();
        SellerInfoRequest request = new SellerInfoRequest();
        request.setSellerType(sellerType);
        Integer pageNum = request.getPageNo();
        //设置每页数据量
        request.setPageSize(SkuSellerRelationConstants.QUERY_MAX_PAGE_SIZE);
        while (true) {
            //根据条件分页查询出卖家列表
            Optional<List<SellerInfoResponse>> sellerInfoResponses = querySellerInfo(request);
            if (!sellerInfoResponses.isPresent()) {
                break;
            }
            List<SellerInfoResponse> sellerInfoResponseList = sellerInfoResponses.get();
            if (CollectionUtils.isEmpty(sellerInfoResponseList)) {
                break;
            }
            String batchResult = getBatchResult(sellerInfoResponseList, pageNum);
            batchList.add(batchResult);
            //继续取下一页的用户数据
            request.setPageNo(++pageNum);
        }
        return batchList;
    }

    //根据条件分页查询出卖家列表
    public Optional<List<SellerInfoResponse>> querySellerInfo(SellerInfoRequest request) {
        LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery();
        //类型
        queryWrapper.eq(Objects.nonNull(request.getSellerType()), SellerInfoDO::getSellerType, request.getSellerType());
        //卖家位置(层级)
        queryWrapper.eq(Objects.nonNull(request.getSellerPosition()), SellerInfoDO::getSellerPosition, request.getSellerPosition());
        //状态
        queryWrapper.eq(Objects.nonNull(request.getSellerStatus()), SellerInfoDO::getSellerStatus, request.getSellerStatus());
        //卖家ID
        queryWrapper.eq(Objects.nonNull(request.getSellerId()), SellerInfoDO::getSellerId, request.getSellerId());
        //卖家编码
        queryWrapper.eq(StringUtils.isNotEmpty(request.getSellerCode()), SellerInfoDO::getSellerCode, request.getSellerCode());
        //卖家名称
        queryWrapper.like(StringUtils.isNotEmpty(request.getSellerName()), SellerInfoDO::getSellerName, request.getSellerName());
        //父卖家ID
        queryWrapper.eq(Objects.nonNull(request.getParentId()), SellerInfoDO::getParentId, request.getParentId());
        //卖家ID集合
        queryWrapper.in(CollectionUtils.isNotEmpty(request.getSellerIdList()), SellerInfoDO::getSellerId, request.getSellerIdList());

        Page<SellerInfoDO> page = new Page<>(request.getPageNo(), request.getPageSize());
        Page<SellerInfoDO> pageResult = sellerInfoMapper.selectPage(page, queryWrapper);
        if (Objects.isNull(pageResult)) {
            return Optional.of(Collections.emptyList());
        }

        return Optional.of(sellerInfoConverter.listEntityToResponse(pageResult.getRecords()));
    }

    //将每页的卖家数据转换成JSON
    private String getBatchResult(List<SellerInfoResponse> sellerInfoResponseList, Integer pageNum) {
        List<String> batchResult = new ArrayList<>(sellerInfoResponseList.size());
        for (SellerInfoResponse sellerInfoResponse : sellerInfoResponseList) {
            PreheatSellerMessage message = new PreheatSellerMessage();
            message.setCachePageNo(pageNum);
            message.setSellerInfo(sellerInfoResponse);
            batchResult.add(JsonUtil.object2Json(message));
        }
        return JsonUtil.object2Json(batchResult);
    }
    ...
}

(2)消费消息

通过多机器分布式消费MQ来实现分布式预热缓存。

步骤一:每个消费者每次消费⼀条消息。

步骤二:拿到消息后,解析出需要预热的缓存数据,即构造出100个元素的sellerInfoResultMap,其中key为Redis卖家信息缓存的key,value为卖家信息JSON字符串。然后通过执行mset将数据设置到Redis缓存。

步骤三:接着提取出sellerIdList集合,并根据消息中卖家信息的sellerType,构造出Redis卖家类型的⻚缓存的key,然后通过执行set设置到Redis缓存。

//缓存预热消费者
@Component
public class SellerPreheatCacheListener implements MessageListenerConcurrently {
    @Autowired
    private RedisCache redisCache;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        //每个消费者每次消费分批次的多页数据,也就是List<String>,每个String对应了一页的数据
        //每个String包含页码和该页下的100个SellerInfo集合
        try {
            for (MessageExt messageExt : list) {
                //获取到分批次的多页数据List
                String msg = new String(messageExt.getBody());
                List messageList = JsonUtil.json2Object(msg, List.class);
                if (CollectionUtils.isEmpty(messageList)) {
                    throw new BaseBizException(ProductErrorCodeEnum.PARAM_CHECK_ERROR, ProductErrorCodeEnum.PARAM_CHECK_ERROR.getErrorCode());
                }

                Map<String, String> sellerInfoResultMap = new HashMap<>(messageList.size());
                List<Long> sellerIdList = new ArrayList<>(messageList.size());
                //List<String>中的每个String,都包含了100个卖家数据,这些卖家类型都是一样的
                for (Object message : messageList) {
                    PreheatSellerMessage preheatSellerMessage = JsonUtil.json2Object(message.toString(), PreheatSellerMessage.class);
                    if (Objects.isNull(preheatSellerMessage)) {
                        throw new BaseBizException(ProductErrorCodeEnum.PARAM_CHECK_ERROR, ProductErrorCodeEnum.PARAM_CHECK_ERROR.getErrorCode());
                    }
                    //卖家信息
                    SellerInfoResponse sellerInfo = preheatSellerMessage.getSellerInfo();
                    //卖家信息的Key
                    String sellerInfoKey = SellerRedisKeyConstants.SELLER_INFO_LIST + sellerInfo.getSellerId();
                    //卖家信息集合
                    sellerInfoResultMap.put(sellerInfoKey, JsonUtil.object2Json(sellerInfo));
                    //卖家ID集合
                    sellerIdList.add(sellerInfo.getSellerId());
                }

                PreheatSellerMessage preheatSellerMessage = JsonUtil.json2Object(messageList.get(0).toString(), PreheatSellerMessage.class);
                if (Objects.isNull(preheatSellerMessage)) {
                    throw new BaseBizException(ProductErrorCodeEnum.PARAM_CHECK_ERROR, ProductErrorCodeEnum.PARAM_CHECK_ERROR.getErrorCode());
                }

                //卖家类型
                Integer sellerType = preheatSellerMessage.getSellerInfo().getSellerType();
                //卖家类型的key
                String sellerTypeKey = "";
                if (sellerType.equals(SellerTypeEnum.SELF.getCode())) {
                    sellerTypeKey = SellerRedisKeyConstants.SELF_TYPE_LIST + preheatSellerMessage.getCachePageNo();
                } else {
                    sellerTypeKey = SellerRedisKeyConstants.POP_TYPE_LIST + preheatSellerMessage.getCachePageNo();
                }

                //将卖家信息set到缓存
                redisCache.mset(sellerInfoResultMap);
                //将卖家类型ID集合set到缓存
                redisCache.set(sellerTypeKey, JsonUtil.object2Json(sellerIdList), -1);
            }
        } catch (Exception e) {
            log.error("consume error, 缓存预热消息消费失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

7.商品卖家系统定时查询DB最新数据更新缓存

//卖家缓存数据同步
@Component
public class SellerCacheSyncSchedule {
    @Autowired
    private RedisCache redisCache;

    @Autowired
    private SellerRepository sellerRepository;

    //卖家缓存数据同步定时任务
    //5分钟执行一次,每次查询出数据库最近5分钟的更新和新增数据,然后同步差异数据到缓存
    @XxlJob("cacheSync")
    public void cacheSync() {
        //计算出前五分钟的时间
        Date beforeTIme = getBeforeTime();
        //从数据库中查询出最近5分钟内的数据
        List<SellerInfoDO> sellerInfoList = sellerRepository.querySellerInfoFiveMinute(beforeTIme);
        //过滤出数据库和缓存中的差异数据,并将差异数据保存到缓存
        saveDiffSellerInfo(sellerInfoList);
    }

    //比较数据库和缓存中的差异数据,过滤出差异数据,并将差异数据保存到缓存
    private void saveDiffSellerInfo(List<SellerInfoDO> sellerInfoList) {
        sellerInfoList.stream().peek(sellerInfo -> {
            Long sellerId = sellerInfo.getSellerId();
            String redisKey = SellerRedisKeyConstants.SELLER_INFO_LIST + sellerId;
            //不存在说明是差异数据
            if (!redisCache.hasKey(redisKey)) {
                redisCache.set(redisKey, JsonUtil.object2Json(sellerInfo), -1);
            }
            //TODO 存在比较差异
        }).count();
    }

    //获取5分钟之前的时间
    private Date getBeforeTime() {
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE, -5);
        return calendar.getTime();
    }
}

@Repository
public class SellerRepository {
    ...
    //查询出前5分钟之内的数据
    public List<SellerInfoDO> querySellerInfoFiveMinute(Date beforeTIme) {
        List<SellerInfoDO> batchList = new ArrayList<>();
        int pageNum = 1;
        //设置每页数据量
        int pageSize = ProductConstants.QUERY_ITEM_MAX_COUNT;
        while (true) {
            LambdaQueryWrapper<SellerInfoDO> queryWrapper = Wrappers.lambdaQuery();
            queryWrapper.ge(SellerInfoDO::getCreateTime, beforeTIme);
            queryWrapper.ge(SellerInfoDO::getUpdateTime, beforeTIme);

            Page<SellerInfoDO> page = new Page<>(pageNum, pageSize);
            Page<SellerInfoDO> pageResult = sellerInfoMapper.selectPage(page, queryWrapper);
            if (Objects.isNull(pageResult) || pageResult.getRecords().size() <= 0) {
                break;
            }

            batchList.addAll(pageResult.getRecords());
            //进入下一页
            pageNum++;
        }
        return batchList;
    }
    ...
}

8.商品中心高并发架构总结

一.高并发C端系统(限流 + 过滤 + 实时缓存同步)

二.高并发卖家系统(缓存预热 + 定时缓存同步)

三.高并发库存系统(缓存分桶)

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务