前言
热点参数流控会统计传入参数中的热点参数,对包含热点参数的资源调用进行流量控制,使用漏桶和令牌桶的算法进行流控。
一、热点参数流程
1、 ProcessorSlot链上的执行方法entry();
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//检查是否存在热点参数流控规则
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
//校验规则
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
2、 checkFlow();
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {
return;
}
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
//获取流控规则
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
//计算流控参数的索引
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
//初始化参数统计
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
//校验规则
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
//获取热点参数
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
3、 applyRealParamIdx();
处理参数索引为负的情况
void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) {
int paramIdx = rule.getParamIdx();
if (paramIdx < 0) {
if (-paramIdx <= length) {
rule.setParamIdx(length + paramIdx);
} else {
// Illegal index, give it a illegal positive value, latter rule checking will pass.
rule.setParamIdx(-paramIdx);
}
}
}
4、 initParamMetricsFor();
初始化了三个用于参数统计的属性:
ruleTimeCounters,用于记录令牌桶最后添加时间
ruleTokenCounter,用于记录令牌数量
threadCountMap,用于记录线程数量
public static void initParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule) {
if (resourceWrapper == null || resourceWrapper.getName() == null) {
return;
}
String resourceName = resourceWrapper.getName();
ParameterMetric metric;
// Assume that the resource is valid.
if ((metric = metricsMap.get(resourceName)) == null) {
synchronized (LOCK) {
if ((metric = metricsMap.get(resourceName)) == null) {
metric = new ParameterMetric();
metricsMap.put(resourceWrapper.getName(), metric);
RecordLog.info("[ParameterMetricStorage] Creating parameter metric for: " + resourceWrapper.getName());
}
}
}
metric.initialize(rule);
}
public void initialize(ParamFlowRule rule) {
if (!ruleTimeCounters.containsKey(rule)) {
synchronized (lock) {
if (ruleTimeCounters.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
}
}
}
if (!ruleTokenCounter.containsKey(rule)) {
synchronized (lock) {
if (ruleTokenCounter.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
}
}
}
if (!threadCountMap.containsKey(rule.getParamIdx())) {
synchronized (lock) {
if (threadCountMap.get(rule.getParamIdx()) == null) {
threadCountMap.put(rule.getParamIdx(),
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY));
}
}
}
}
5、 ParamFlowChecker;
1、passCheck( )
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
if (args == null) {
return true;
}
//校验参数索引,没有该索引的参数,返回true
int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx) {
return true;
}
// Get parameter value.
//获取参数
Object value = args[paramIdx];
// Assign value with the result of paramFlowKey method
if (value instanceof ParamFlowArgument) {
value = ((ParamFlowArgument) value).paramFlowKey();
}
// If value is null, then pass
if (value == null) {
return true;
}
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//集群规则检测
return passClusterCheck(resourceWrapper, rule, count, value);
}
//本地检测
return passLocalCheck(resourceWrapper, rule, count, value);
}
2、passLocalCheck( )
参数是Collection 或 Array 时,里面的每一个参数都作为热点参数进行校验
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
//参数是Collection类型
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
//校验集合中的每一个参数
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else if (value.getClass().isArray()) {
//参数是Array类型
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
//校验数组中的每一个参数
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else {
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
}
return true;
}
3、passSingleValueCheck( )
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
//qps限流
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
//速率限制,漏桶
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {
//默认限制,令牌桶
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
}
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
//线程限流
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
int itemThreshold = rule.getParsedHotItems().get(value);
//比较线程数和规则里参数阈值
return ++threadCount <= itemThreshold;
}
long threshold = (long)rule.getCount();
//比较线程数与规则阈值
return ++threadCount <= threshold;
}
return true;
}
二、漏桶算法
上次获取令牌时间 + 当前请求令牌数生成需要花费的时间,
小于当前时间时,成功获取到令牌;
大于当前时间时,会有一个允许的等待时间,不超过等待时间,则成功获取到令牌,否则就表示没有足够的令牌。
static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
//最新生成的令牌时间计数器
CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
if (timeRecorderMap == null) {
return true;
}
// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
//获取规则中阈值
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
//获取规则热点参数中阈值
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
//当前请求的令牌,需要花费多长时间才能生成这么多令牌
long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
//初始化时间记录器
AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
if (timeRecorder == null) {
//第一次访问时没有时间记录器,认为获取到令牌,直接返回
return true;
}
//AtomicLong timeRecorder = timeRecorderMap.get(value);
//上次记录的获取令牌时间
long lastPassTime = timeRecorder.get();
//本次期望的获取令牌时间
long expectedTime = lastPassTime + costTime;
//期望时间小于当前时间;或者期望时间大于大于当前时间,但是在允许等待的时间范围内
if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
//将缓存获取令牌时间更新为当前时间
if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
long waitTime = expectedTime - currentTime;
if (waitTime > 0) {
//如果有等待时间,则将内存记录的将时间设置为期望时间
lastPastTimeRef.set(expectedTime);
try {
//休眠一会
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
}
}
return true;
} else {
Thread.yield();
}
} else {
return false;
}
}
}
三、令牌桶算法
通过计算当前时间和上一次更新令牌时间的时间间隔,
时间间隔超过1s后,计算产生的令牌数 + 缓存中剩下的令牌数,作为总的令牌数供本次请求使用,取出令牌后,更新缓存中的令牌总数,并更新最新的令牌生成截止时间;
时间间隔没有超过1s,直接获取缓存剩余令牌数,更新缓存令牌数,不更新生成令牌时间,即在同1s内使用同一批固定个数的令牌。
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
//令牌计数器
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
//最新生成的令牌时间计数器
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null) {
return true;
}
// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
//获取规则中阈值
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
//获取规则热点参数中阈值
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
// 设置的阈值 tokenCount + 允许的突发流量 BurstCount
long maxCount = tokenCount + rule.getBurstCount();
//申请数量是否大于最大流量
if (acquireCount > maxCount) {
return false;
}
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
//热点参数的令牌更新时间
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
//首次,初始化令牌数
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
// Calculate the time duration since last token was added.
//上一次与当前时间的间隔
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
//校验是否超过1s
if (passTime > rule.getDurationInSec() * 1000) {
//超过1s了,令牌最大能获取数为 maxCount
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
//缓存里没有旧的令牌,认为获取成功,返回true
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
//缓存里有令牌了,可能其他线程设置了令牌,则需要计算本次令牌数是否能获取到
//获取剩余令牌数
long restQps = oldQps.get();
//计算时间间隔内,新产生的令牌数
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
//计算新的剩余令牌数
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
//没有足够的令牌,触发限流
return false;
}
//获取令牌成功,更新最新剩余令牌数,并更新令牌时间;获取令牌失败会循环再次尝试获取
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
} else {
//没有超过1s,获取剩余令牌数
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
//获取到令牌,放行;获取不到令牌会循环再次尝试获取
return true;
}
} else {
//没有足够的令牌,触发限流
return false;
}
}
Thread.yield();
}
}
}