前言
FlowSlot 通过流控模式、流控效果等实现对请求的限流。
一、FlowRule
流控规则FlowRule里的一些属性
private int grade = RuleConstant.FLOW_GRADE_QPS;
//阈值
private double count;
//流控模式:直接、关联、链路
private int strategy = RuleConstant.STRATEGY_DIRECT;
//关联资源或入口资源
private String refResource;
//流控效果:快速失败、预热、排队等待
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
//预热时间
private int warmUpPeriodSec = 10;
//排队等待的超时时间
private int maxQueueingTimeMs = 500;
//是否是集群限流模式
private boolean clusterMode;
//集群限流配置
private ClusterFlowConfig clusterConfig;
//限流控制器
private TrafficShapingController controller;
界面的配置:
二、检验流程
1、 com.alibaba.csp.sentinel.slots.block.flow.FlowSlot的entry()方法;
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//流控校验
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
//FlowRuleChecker.checkFlow()
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
2、 FlowRuleChecker.checkFlow();
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//从 FlowRuleManager 获取对应资源的流控规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
//校验规则,不通过时抛出异常
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
//集群模式
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
//单机模式
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
//根据流控模式获取对应的node
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//根据流控效果对应的控制器 TrafficShapingController 校验能够通过规则
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
3、 selectNodeByRequesterAndStrategy();
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
//规则针对的调用方
String limitApp = rule.getLimitApp();
//流控模式
int strategy = rule.getStrategy();
//请求的调用方,上层链路
String origin = context.getOrigin();
//规则针对的调用方时本次请求的调用方,并且不是default、other
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// 直接模式,返回调用方node
return context.getOriginNode();
}
//关联或链路模式
return selectReferenceNode(rule, context, node);
//流控规则针对的调用方是default
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// 直接模式,返回ClusterNode
return node.getClusterNode();
}
//关联或链路模式
return selectReferenceNode(rule, context, node);
//其他模式,并且没有对该资源配置其他的流控规则
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// 直接模式,返回调用方node
return context.getOriginNode();
}
//关联或链路模式
return selectReferenceNode(rule, context, node);
}
return null;
}
4、 selectReferenceNode();
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
//关联模式,从集群环境中获取对应关联资源Node
if (strategy == RuleConstant.STRATEGY_RELATE) {
return ClusterBuilderSlot.getClusterNode(refResource);
}
//链路模式,判断入口资源与上下文名称是否一致,一致时返回入口资源node
if (strategy == RuleConstant.STRATEGY_CHAIN) {
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
// No node.
return null;
}
5、 调用TrafficShapingController的canPass();
DefaultController,默认控制器
WarmUpController,预热限流
RateLimiterController,匀速排队限流
WarmUpRateLimiterController,匀速排队和预热相结合限流
三、DefaultController
快速失败限流
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//获取当前资源的请求数
int curCount = avgUsedTokens(node);
//校验配置的阈值
if (curCount + acquireCount > count) {
//设置优先,并且是QPS控制
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
//获取下次能请求资源的时间
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
//等待时间小于设置的超时时间
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
//休眠等待
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
//抛出PriorityWaitException异常
throw new PriorityWaitException(waitInMs);
}
}
//大于阈值,返回失败
return false;
}
//小于阈值,返回成功
return true;
}
四、RateLimiterController
排队等待限流
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 校验需要获取的令牌数
if (acquireCount <= 0) {
return true;
}
// 校验阈值
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// 产生需要的令牌数花费的时间
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// 期望时间
long expectedTime = costTime + latestPassedTime.get();
//当前时间已经过了期望的时间,说明有足够的时间生成需要的令牌,则成功获取到令牌
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
// 没有到期望时间,计算需要等待的时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
//等待时间大于最大等待超时时间时,获取令牌失败
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
//更新缓存获取令牌时间
long oldTime = latestPassedTime.addAndGet(costTime);
try {
//再次校验等待时间
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// 休眠等待一会,然后成功获取令牌
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}