Sentinel源码—7.参数限流和注解的实现
大纲
1.参数限流的原理和源码
1.参数限流的原理和源码
参数限流规则ParamFlowRule的配置Demo
(2)ParamFlowSlot根据参数限流规则验证请求
(1)参数限流规则ParamFlowRule的配置Demo
一.参数限流的应用场景
二.参数限流规则的属性
三.参数限流规则的配置Demo
一.参数限流的应用场景
传统的流量控制,一般是通过资源维度来限制某接口或方法的调用频率。但有时需要更细粒度地控制不同参数条件下的访问速率,即参数限流。参数限流允许根据不同的参数条件设置不同的流量控制规则,这种方式非常适合处理特定条件下的请求,因为能更加精细地管理流量。
假设有一个在线电影订票系统,某个接口允许用户查询电影的放映时间。但只希望每个用户每10秒只能查询接口1次,以避免过多的查询请求。这时如果直接将接口的QPS限制为5是不能满足要求的,因为需求是每个用户每5分钟只能查询1次,而不是每秒一共只能查询5次,因此参数限流就能派上用场了。
可以设置一个规则,根据用户ID来限制每个用户的查询频率,将限流的维度从资源维度细化到参数维度,从而实现每个用户每10秒只能查询接口1次。比如希望影院工作人员可以每秒查询10次,老板可以每秒查询100次,而购票者则只能每10秒查询一次,其中工作人员的userId值为100和200,老板的userId值为9999,那么可以如下配置:需要注意限流阈值是以秒为单位的,所以需要乘以统计窗口时长10。
二.参数限流规则的属性
public class ParamFlowRule extends AbstractRule { ... //The threshold type of flow control (0: thread count, 1: QPS). //流量控制的阈值类型(0表示线程数,1表示QPS) private int grade = RuleConstant.FLOW_GRADE_QPS; //Parameter index. //参数下标 private Integer paramIdx; //The threshold count. //阈值 private double count; //Original exclusion items of parameters. //针对特定参数的流量控制规则列表 private List<ParamFlowItem> paramFlowItemList = new ArrayList<ParamFlowItem>(); //Indicating whether the rule is for cluster mode. //是否集群 private boolean clusterMode = false; ... } //针对特定参数的流量控制规则 public class ParamFlowItem { private String object; private Integer count; private String classType; ... }
三.参数限流规则的配置Demo
//This demo demonstrates flow control by frequent ("hot spot") parameters. public class ParamFlowQpsDemo { private static final int PARAM_A = 1; private static final int PARAM_B = 2; private static final int PARAM_C = 3; private static final int PARAM_D = 4; //Here we prepare different parameters to validate flow control by parameters. private static final Integer[] PARAMS = new Integer[] {PARAM_A, PARAM_B, PARAM_C, PARAM_D}; private static final String RESOURCE_KEY = "resA"; public static void main(String[] args) throws Exception { initParamFlowRules(); final int threadCount = 20; ParamFlowQpsRunner<Integer> runner = new ParamFlowQpsRunner<>(PARAMS, RESOURCE_KEY, threadCount, 120); runner.tick(); Thread.sleep(1000); runner.simulateTraffic(); } private static void initParamFlowRules() { //QPS mode, threshold is 5 for every frequent "hot spot" parameter in index 0 (the first arg). ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY) .setParamIdx(0) .setGrade(RuleConstant.FLOW_GRADE_QPS) .setCount(5); //We can set threshold count for specific parameter value individually. //Here we add an exception item. That means: //QPS threshold of entries with parameter `PARAM_B` (type: int) in index 0 will be 10, rather than the global threshold (5). ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B)) .setClassType(int.class.getName()) .setCount(10); rule.setParamFlowItemList(Collections.singletonList(item)); //ParamFlowRuleManager类加载的一个时机是:它的静态方法被调用了 //所以下面会先初始化ParamFlowRuleManager,再执行loadRules()方法 ParamFlowRuleManager.loadRules(Collections.singletonList(rule)); } } public final class ParamFlowRuleManager { private static final Map<String, List<ParamFlowRule>> PARAM_FLOW_RULES = new ConcurrentHashMap<>(); private final static RulePropertyListener PROPERTY_LISTENER = new RulePropertyListener(); private static SentinelProperty<List<ParamFlowRule>> currentProperty = new DynamicSentinelProperty<>(); static { currentProperty.addListener(PROPERTY_LISTENER); } //Load parameter flow rules. Former rules will be replaced. public static void loadRules(List<ParamFlowRule> rules) { try { //设置规则的值为rules currentProperty.updateValue(rules); } catch (Throwable e) { RecordLog.info("[ParamFlowRuleManager] Failed to load rules", e); } } static class RulePropertyListener implements PropertyListener<List<ParamFlowRule>> { @Override public void configUpdate(List<ParamFlowRule> list) { Map<String, List<ParamFlowRule>> rules = aggregateAndPrepareParamRules(list); if (rules != null) { PARAM_FLOW_RULES.clear(); PARAM_FLOW_RULES.putAll(rules); } RecordLog.info("[ParamFlowRuleManager] Parameter flow rules received: {}", PARAM_FLOW_RULES); } @Override public void configLoad(List<ParamFlowRule> list) { Map<String, List<ParamFlowRule>> rules = aggregateAndPrepareParamRules(list); if (rules != null) { PARAM_FLOW_RULES.clear(); PARAM_FLOW_RULES.putAll(rules); } RecordLog.info("[ParamFlowRuleManager] Parameter flow rules received: {}", PARAM_FLOW_RULES); } ... } ... } public class DynamicSentinelProperty<T> implements SentinelProperty<T> { protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>(); private T value = null; public DynamicSentinelProperty() { } //添加监听器到集合 @Override public void addListener(PropertyListener<T> listener) { listeners.add(listener); //回调监听器的configLoad()方法初始化规则配置 listener.configLoad(value); } //更新值 @Override public boolean updateValue(T newValue) { //如果值没变化,直接返回 if (isEqual(value, newValue)) { return false; } RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue); //如果值发生了变化,则遍历监听器,回调监听器的configUpdate()方法更新对应的值 value = newValue; for (PropertyListener<T> listener : listeners) { listener.configUpdate(newValue); } return true; } ... } //A traffic runner to simulate flow for different parameters. class ParamFlowQpsRunner<T> { private final T[] params; private final String resourceName; private int seconds; private final int threadCount; private final Map<T, AtomicLong> passCountMap = new ConcurrentHashMap<>(); private final Map<T, AtomicLong> blockCountMap = new ConcurrentHashMap<>(); private volatile boolean stop = false; public ParamFlowQpsRunner(T[] params, String resourceName, int threadCount, int seconds) { this.params = params; this.resourceName = resourceName; this.seconds = seconds; this.threadCount = threadCount; for (T param : params) { passCountMap.putIfAbsent(param, new AtomicLong()); blockCountMap.putIfAbsent(param, new AtomicLong()); } } public void tick() { Thread timer = new Thread(new TimerTask()); timer.setName("sentinel-timer-task"); timer.start(); } public void simulateTraffic() { for (int i = 0; i < threadCount; i++) { Thread t = new Thread(new RunTask()); t.setName("sentinel-simulate-traffic-task-" + i); t.start(); } } final class TimerTask implements Runnable { @Override public void run() { long start = System.currentTimeMillis(); System.out.println("Begin to run! Go go go!"); System.out.println("See corresponding metrics.log for accurate statistic data"); Map<T, Long> map = new HashMap<>(params.length); for (T param : params) { map.putIfAbsent(param, 0L); } while (!stop) { sleep(1000); //There may be a mismatch for time window of internal sliding window. //See corresponding `metrics.log` for accurate statistic log. for (T param : params) { System.out.println(String.format( "[%d][%d] Parameter flow metrics for resource %s: pass count for param <%s> is %d, block count: %d", seconds, TimeUtil.currentTimeMillis(), resourceName, param, passCountMap.get(param).getAndSet(0), blockCountMap.get(param).getAndSet(0) )); } System.out.println("============================="); if (seconds-- <= 0) { stop = true; } } long cost = System.currentTimeMillis() - start; System.out.println("Time cost: " + cost + " ms"); System.exit(0); } } final class RunTask implements Runnable { @Override public void run() { while (!stop) { Entry entry = null; T param = generateParam(); try { entry = SphU.entry(resourceName, EntryType.IN, 1, param); //Add pass for parameter. passFor(param); } catch (BlockException e) { //block.incrementAndGet(); blockFor(param); } catch (Exception ex) { //biz exception ex.printStackTrace(); } finally { //total.incrementAndGet(); if (entry != null) { entry.exit(1, param); } } sleep(ThreadLocalRandom.current().nextInt(0, 10)); } } } //Pick one of provided parameters randomly. private T generateParam() { int i = ThreadLocalRandom.current().nextInt(0, params.length); return params[i]; } private void passFor(T param) { passCountMap.get(param).incrementAndGet(); } private void blockFor(T param) { blockCountMap.get(param).incrementAndGet(); } private void sleep(int timeMs) { try { TimeUnit.MILLISECONDS.sleep(timeMs); } catch (InterruptedException e) { } } }
(2)ParamFlowSlot根据参数限流规则验证请求
一.ParamFlowSlot的entry()方法的逻辑
二.不同限流类型 + 阈值类型 + 流控效果的处理
三.流控效果为排队等待和直接拒绝的实现
四.参数限流是如何进行数据统计
五.参数限流验证请求的流程图总结
一.ParamFlowSlot的entry()方法的逻辑
ParamFlowSlot的entry()方法主要干了三件事:参数验证、获取当前资源的全部参数限流规则、循环每一个参数限流规则并判断此次请求是否被允许通过(如果不允许则直接抛出异常)。其中对每一条获取到的参数限流规则,都会通过ParamFlowChecker的passCheck()方法进行判断。
@Spi(order = -3000) public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { //1.如果没配置参数限流规则,直接触发下一个Slot if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) { fireEntry(context, resourceWrapper, node, count, prioritized, args); return; } //2.如果配置了参数限流规则,则调用ParamFlowSlot的checkFlow()方法,该方法执行完成后再触发下一个Slot checkFlow(resourceWrapper, count, args); fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) { int paramIdx = rule.getParamIdx(); if (paramIdx < 0) { if (-paramIdx <= length) { rule.setParamIdx(length + paramIdx); } else { //Illegal index, give it a illegal positive value, latter rule checking will pass. rule.setParamIdx(-paramIdx); } } } void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { //1.如果没传递参数,则直接放行,代表不做参数限流逻辑 if (args == null) { return; } //2.如果没给resourceWrapper这个资源配置参数限流规则,则直接放行 if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) { return; } //3.获取此资源的全部参数限流规则,规则可能会有很多个,所以是个List List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName()); //4.遍历获取到的参数限流规则 for (ParamFlowRule rule : rules) { //进行参数验证 applyRealParamIdx(rule, args.length); //Initialize the parameter metrics. ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule); //进行验证的核心方法:检查当前规则是否允许通过此请求,如果不允许,则抛出ParamFlowException异常 if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) { String triggeredParam = ""; if (args.length > rule.getParamIdx()) { Object value = args[rule.getParamIdx()]; //Assign actual value with the result of paramFlowKey method if (value instanceof ParamFlowArgument) { value = ((ParamFlowArgument) value).paramFlowKey(); } triggeredParam = String.valueOf(value); } throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule); } } } }
二.不同限流类型 + 阈值类型 + 流控效果的处理
在ParamFlowChecker的passCheck()方法中,参数值验证通过之后,会判断限流类型。如果是集群限流,则执行ParamFlowChecker的passClusterCheck()方法。如果是单机限流,则执行ParamFlowChecker的passLocalCheck()方法。
在ParamFlowChecker的passLocalCheck()方法中,则会根据不同的参数类型调用ParamFlowChecker的passSingleValueCheck()方法。根据该方法可以知道,参数限流支持两种阈值类型:一种是QPS,另一种是线程数。而QPS类型还支持两种流控效果,分别是排队等待和直接拒绝,但不支持Warm Up。
//Rule checker for parameter flow control. public final class ParamFlowChecker { public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count, Object... args) { if (args == null) { return true; } //1.判断参数索引是否合法,这个就是配置参数限流时设置的下标,从0开始,也就是对应args里的下标 //比如0就代表args数组里的第一个参数,如果参数不合法直接放行,相当于参数限流没生效 int paramIdx = rule.getParamIdx(); if (args.length <= paramIdx) { return true; } //2.判断参数值是不是空,如果是空直接放行 //Get parameter value. Object value = args[paramIdx]; //Assign value with the result of paramFlowKey method if (value instanceof ParamFlowArgument) { value = ((ParamFlowArgument) value).paramFlowKey(); } //If value is null, then pass if (value == null) { return true; } //3.集群限流 if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { return passClusterCheck(resourceWrapper, rule, count, value); } //4.单机限流 return passLocalCheck(resourceWrapper, rule, count, value); } private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) { try { if (Collection.class.isAssignableFrom(value.getClass())) {//基本类型 for (Object param : ((Collection)value)) { if (!passSingleValueCheck(resourceWrapper, rule, count, param)) { return false; } } } else if (value.getClass().isArray()) {//数组类型 int length = Array.getLength(value); for (int i = 0; i < length; i++) { Object param = Array.get(value, i); if (!passSingleValueCheck(resourceWrapper, rule, count, param)) { return false; } } } else {//其他类型,也就是引用类型 return passSingleValueCheck(resourceWrapper, rule, count, value); } } catch (Throwable e) { RecordLog.warn("[ParamFlowChecker] Unexpected error", e); } return true; } static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) { if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {//类型是QPS if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) { //流控效果为排队等待 return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value); } else { //流控效果为直接拒绝 return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value); } } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {//类型是Thread Set<Object> exclusionItems = rule.getParsedHotItems().keySet(); long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value); if (exclusionItems.contains(value)) { int itemThreshold = rule.getParsedHotItems().get(value); return ++threadCount <= itemThreshold; } long threshold = (long)rule.getCount(); return ++threadCount <= threshold; } return true; } ... }
三.流控效果为排队等待和直接拒绝的实现
当设置了QPS类型的流控效果为排队等待时,会调用ParamFlowChecker的passThrottleLocalCheck()方法。该方法实现排队等待效果的原理和流控规则FlowSlot通过RateLimiterController实现排队等待效果的原理是一样的。
//Rule checker for parameter flow control. public final class ParamFlowChecker { ... static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) { ParameterMetric metric = getParameterMetric(resourceWrapper); CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule); if (timeRecorderMap == null) { return true; } //Calculate max token count (threshold) Set<Object> exclusionItems = rule.getParsedHotItems().keySet(); long tokenCount = (long)rule.getCount(); if (exclusionItems.contains(value)) { tokenCount = rule.getParsedHotItems().get(value); } if (tokenCount == 0) { return false; } long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount); while (true) { long currentTime = TimeUtil.currentTimeMillis(); AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime)); if (timeRecorder == null) { return true; } //AtomicLong timeRecorder = timeRecorderMap.get(value); long lastPassTime = timeRecorder.get(); long expectedTime = lastPassTime + costTime; if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) { AtomicLong lastPastTimeRef = timeRecorderMap.get(value); if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) { long waitTime = expectedTime - currentTime; if (waitTime > 0) { lastPastTimeRef.set(expectedTime); try { TimeUnit.MILLISECONDS.sleep(waitTime); } catch (InterruptedException e) { RecordLog.warn("passThrottleLocalCheck: wait interrupted", e); } } return true; } else { Thread.yield(); } } else { return false; } } } private static ParameterMetric getParameterMetric(ResourceWrapper resourceWrapper) { //Should not be null. return ParameterMetricStorage.getParamMetric(resourceWrapper); } }
当设置了QPS类型的流控效果为直接拒绝时,会调用ParamFlowChecker的passDefaultLocalCheck()方法。该方法采取令牌桶的方式来实现:控制每个时间窗口只生产一次token令牌,且将令牌放入桶中,每个请求都从桶中取令牌,当可以获取到令牌时,则正常放行,反之直接拒绝。
//Rule checker for parameter flow control. public final class ParamFlowChecker { ... static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) { ParameterMetric metric = getParameterMetric(resourceWrapper); CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule); CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule); if (tokenCounters == null || timeCounters == null) { return true; } //Calculate max token count (threshold) Set<Object> exclusionItems = rule.getParsedHotItems().keySet(); long tokenCount = (long)rule.getCount(); if (exclusionItems.contains(value)) { tokenCount = rule.getParsedHotItems().get(value); } if (tokenCount == 0) { return false; } long maxCount = tokenCount + rule.getBurstCount(); if (acquireCount > maxCount) { return false; } while (true) { long currentTime = TimeUtil.currentTimeMillis(); AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime)); if (lastAddTokenTime == null) { //Token never added, just replenish the tokens and consume {@code acquireCount} immediately. tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount)); return true; } //Calculate the time duration since last token was added. long passTime = currentTime - lastAddTokenTime.get(); //A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed. if (passTime > rule.getDurationInSec() * 1000) { AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount)); if (oldQps == null) { //Might not be accurate here. lastAddTokenTime.set(currentTime); return true; } else { long restQps = oldQps.get(); long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000); long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount) : (restQps + toAddCount - acquireCount); if (newQps < 0) { return false; } if (oldQps.compareAndSet(restQps, newQps)) { lastAddTokenTime.set(currentTime); return true; } Thread.yield(); } } else { AtomicLong oldQps = tokenCounters.get(value); if (oldQps != null) { long oldQpsValue = oldQps.get(); if (oldQpsValue - acquireCount >= 0) { if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) { return true; } } else { return false; } } Thread.yield(); } } } }
四.参数限流是如何进行数据统计
由于参数限流的数据统计需要细化到参数值的维度,所以使用参数限流时需要注意OOM问题。比如根据用户ID进行限流,且用户数量有几千万,那么CacheMap将会包含几千万个不会被移除的键值对,而且会随着进程运行时间的增长而不断增加,最后可能会导致OOM。
public final class ParameterMetricStorage { private static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>(); //Lock for a specific resource. private static final Object LOCK = new Object(); //Init the parameter metric and index map for given resource. //该方法在ParamFlowSlot的checkFlow()方法中被调用 public static void initParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule) { if (resourceWrapper == null || resourceWrapper.getName() == null) { return; } String resourceName = resourceWrapper.getName(); ParameterMetric metric; //Assume that the resource is valid. if ((metric = metricsMap.get(resourceName)) == null) { synchronized (LOCK) { if ((metric = metricsMap.get(resourceName)) == null) { metric = new ParameterMetric(); metricsMap.put(resourceWrapper.getName(), metric); RecordLog.info("[ParameterMetricStorage] Creating parameter metric for: {}", resourceWrapper.getName()); } } } metric.initialize(rule); } //该方法在ParamFlowChecker的passThrottleLocalCheck()和passDefaultLocalCheck()方法执行getParameterMetric()方法时被调用 public static ParameterMetric getParamMetric(ResourceWrapper resourceWrapper) { if (resourceWrapper == null || resourceWrapper.getName() == null) { return null; } return metricsMap.get(resourceWrapper.getName()); } ... } //Metrics for frequent ("hot spot") parameters. public class ParameterMetric { private static final int THREAD_COUNT_MAX_CAPACITY = 4000; private static final int BASE_PARAM_MAX_CAPACITY = 4000; private static final int TOTAL_MAX_CAPACITY = 20_0000; private final Object lock = new Object(); //Format: (rule, (value, timeRecorder)) private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>(); //Format: (rule, (value, tokenCounter)) private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>(); private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>(); public void initialize(ParamFlowRule rule) { if (!ruleTimeCounters.containsKey(rule)) { synchronized (lock) { if (ruleTimeCounters.get(rule) == null) { long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY); ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size)); } } } if (!ruleTokenCounter.containsKey(rule)) { synchronized (lock) { if (ruleTokenCounter.get(rule) == null) { long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY); ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size)); } } } if (!threadCountMap.containsKey(rule.getParamIdx())) { synchronized (lock) { if (threadCountMap.get(rule.getParamIdx()) == null) { threadCountMap.put(rule.getParamIdx(), new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY)); } } } } //Get the token counter for given parameter rule. //@param rule valid parameter rule //@return the associated token counter public CacheMap<Object, AtomicLong> getRuleTokenCounter(ParamFlowRule rule) { return ruleTokenCounter.get(rule); } //Get the time record counter for given parameter rule. //@param rule valid parameter rule //@return the associated time counter public CacheMap<Object, AtomicLong> getRuleTimeCounter(ParamFlowRule rule) { return ruleTimeCounters.get(rule); } public long getThreadCount(int index, Object value) { CacheMap<Object, AtomicInteger> cacheMap = threadCountMap.get(index); if (cacheMap == null) { return 0; } AtomicInteger count = cacheMap.get(value); return count == null ? 0L : count.get(); } ... }
五.参数限流验证请求的流程图总结
@SentinelResource注解的使用
一.引入Sentinel Spring Boot Starter依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-sentinel</artifactId> <version>2.2.1.RELEASE</version> </dependency>
二.为方法添加@SentinelResource注解
下面的代码为sayHello()方法添加了@SentinelResource注解,并指定了资源名称为sayHello以及熔断降级时的回调方法fallback()。这样在请求sayHello()方法后,就可以在Sentinel Dashboard上看到此资源,然后就可以针对此资源进行一系列的规则配置了。
@Service public class MyService { @SentinelResource(value = "sayHello", fallback = "fallback") public String sayHello(String name) { return "Hello, " + name; } public String fallback(String name, Throwable throwable) { return "Fallback: " + name + ", reason: " + throwable.getMessage(); } }
(2)@SentinelResource注解和实现
利用Spring AOP拦截@SentinelResource注解,最后调用SphU.entry()方法来进行处理。
//Aspect for methods with {@link SentinelResource} annotation. @Aspect public class SentinelResourceAspect extends AbstractSentinelAspectSupport { //SentinelResource注解 @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { //获取方法 Method originMethod = resolveMethod(pjp); //获取方法上的SentinelResource注解,有了这个注解,就可以获取到注解的各种属性值了 SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { //Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } //获取资源名称 String resourceName = getResourceName(annotation.value(), originMethod); //获取资源类型 EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); //创建一个Entry对象,通过SphU.entry(resourceName)将当前方法纳入Sentinel的保护体系 //如果当前资源的调用未触发任何Sentinel规则,则正常执行被拦截的方法,否则将执行对应的限流、熔断降级等处理逻辑 Entry entry = null; try { entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); return pjp.proceed(); } catch (BlockException ex) { //发生异常时,通过反射执行在注解中设置的降级方法 return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); //The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } //No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) { entry.exit(1, pjp.getArgs()); } } } } //Some common functions for Sentinel annotation aspect. public abstract class AbstractSentinelAspectSupport { ... protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex) throws Throwable { //Execute block handler if configured. Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(), annotation.blockHandlerClass()); if (blockHandlerMethod != null) { Object[] originArgs = pjp.getArgs(); //Construct args. Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1); args[args.length - 1] = ex; return invoke(pjp, blockHandlerMethod, args); } //If no block handler is present, then go to fallback. return handleFallback(pjp, annotation, ex); } private Object invoke(ProceedingJoinPoint pjp, Method method, Object[] args) throws Throwable { try { if (!method.isAccessible()) { makeAccessible(method); } if (isStatic(method)) { return method.invoke(null, args); } return method.invoke(pjp.getTarget(), args); } catch (InvocationTargetException e) { //throw the actual exception throw e.getTargetException(); } } ... }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等