一、ModifyRulesCommandHandler 规则处理
在sentinel dashboard 上面给对应的接口配置规则后,sentinel 服务端会将规则信息发送给客户端,客户端在获取到 ModifyRulesCommandHandler 调用 handler( )进行处理规则,下面从该方法进行分析。
1、 handle();
public CommandResponse<String> handle(CommandRequest request) {
// XXX from 1.7.2, force to fail when fastjson is older than 1.2.12
// We may need a better solution on this.
if (VersionUtil.fromVersionString(JSON.VERSION) < FASTJSON_MINIMAL_VER) {
// fastjson too old
return CommandResponse.ofFailure(new RuntimeException("The \"fastjson-" + JSON.VERSION
+ "\" introduced in application is too old, you need fastjson-1.2.12 at least."));
}
//获取规则的类型: flow、authority、degrade、system
String type = request.getParam("type");
// rule data in get parameter
//获取规则数据
String data = request.getParam("data");
if (StringUtil.isNotEmpty(data)) {
try {
data = URLDecoder.decode(data, "utf-8");
} catch (Exception e) {
RecordLog.info("Decode rule data error", e);
return CommandResponse.ofFailure(e, "decode rule data error");
}
}
RecordLog.info("Receiving rule change (type: {}): {}", type, data);
String result = "success";
//根据类型,对规则进行解析,保存到对应的缓存中,如果配置的有数据库,则会将规则进行持久化
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
//流量控制 json 转对象
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
//FlowRuleManager 解析流控规则
FlowRuleManager.loadRules(flowRules);
//将规则写入数据库
if (!writeToDataSource(getFlowDataSource(), flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
//AuthorityRuleManager 解析授权规则
AuthorityRuleManager.loadRules(rules);
if (!writeToDataSource(getAuthorityDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
//DegradeRuleManager 解析熔断规则
DegradeRuleManager.loadRules(rules);
if (!writeToDataSource(getDegradeDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
//SystemRuleManager解析熔断规则
SystemRuleManager.loadRules(rules);
if (!writeToDataSource(getSystemSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
二、flow 流量控制规则
1、 FlowRuleManager.loadRules(flowRules);
public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}
DynamicSentinelProperty.java
public boolean updateValue(T newValue) {
//校验规则是否变化了
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);
value = newValue;
for (PropertyListener<T> listener : listeners) {
//FlowRuleManager.FlowPropertyListener
listener.configUpdate(newValue);
}
return true;
}
2、 listener.configUpdate(newValue);
在FlowRuleManager 的 static 初始化块中注册了 FlowPropertyListener
FlowRuleManager.FlowPropertyListener.java
public void configUpdate(List<FlowRule> value) {
//构建流控规则
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules.clear();
//放入缓存
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
3、 FlowRuleUtil.buildFlowRuleMap(value);
public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction,
Predicate<FlowRule> filter, boolean shouldSort) {
Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();
for (FlowRule rule : list) {
//校验规则
if (!isValidRule(rule)) {
RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
continue;
}
if (filter != null && !filter.test(rule)) {
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
//创建 TrafficShapingController
TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);
//获取资源名称
K key = groupFunction.apply(rule);
if (key == null) {
continue;
}
Set<FlowRule> flowRules = tmpMap.get(key);
if (flowRules == null) {
// Use hash set here to remove duplicate rules.
flowRules = new HashSet<>();
tmpMap.put(key, flowRules);
}
flowRules.add(rule);
}
Comparator<FlowRule> comparator = new FlowRuleComparator();
for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {
List<FlowRule> rules = new ArrayList<>(entries.getValue());
if (shouldSort) {
// Sort the rules.
//使用 FlowRuleComparator 对规则排序
Collections.sort(rules, comparator);
}
newRuleMap.put(entries.getKey(), rules);
}
return newRuleMap;
}
4、 generateRater(rule);
rule里面的 grade 表示限流类型:
FLOW_GRADE_QPS: QPS限流
FLOW_GRADE_THREAD: 访问资源线程数限流
DEGRADE_GRADE_RT:响应时间限流
TrafficShapingController 表示流量控制器,目前有四个实现类:
DefaultController,默认控制器
WarmUpController,预热桶限流
RateLimiterController,匀速排队限流
WarmUpRateLimiterController,匀速排队和预热相结合
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
//创建出对应的流量控制器
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
return new DefaultController(rule.getCount(), rule.getGrade());
}
三、authority 授权规则
1、 AuthorityRuleManager.loadRules(rules);
public static void loadRules(List<AuthorityRule> rules) {
currentProperty.updateValue(rules);
}
DynamicSentinelProperty.java
public boolean updateValue(T newValue) {
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);
value = newValue;
for (PropertyListener<T> listener : listeners) {
//AuthorityRuleManager.RulePropertyListener
listener.configUpdate(newValue);
}
return true;
}
2、 RulePropertyListener.configUpdate();
public void configUpdate(List<AuthorityRule> conf) {
Map<String, Set<AuthorityRule>> rules = loadAuthorityConf(conf);
authorityRules.clear();
if (rules != null) {
//将规则放入缓存
authorityRules.putAll(rules);
}
RecordLog.info("[AuthorityRuleManager] Authority rules received: " + authorityRules);
}
private Map<String, Set<AuthorityRule>> loadAuthorityConf(List<AuthorityRule> list) {
Map<String, Set<AuthorityRule>> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
for (AuthorityRule rule : list) {
if (!isValidRule(rule)) {
RecordLog.warn("[AuthorityRuleManager] Ignoring invalid authority rule when loading new rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
String identity = rule.getResource();
Set<AuthorityRule> ruleSet = newRuleMap.get(identity);
// putIfAbsent
if (ruleSet == null) {
ruleSet = new HashSet<>();
ruleSet.add(rule);
newRuleMap.put(identity, ruleSet);
} else {
// One resource should only have at most one authority rule, so just ignore redundant rules.
RecordLog.warn("[AuthorityRuleManager] Ignoring redundant rule: " + rule.toString());
}
}
return newRuleMap;
}
四、degrade 熔断规则
1、 DegradeRuleManager.loadRules(rules);
public static void loadRules(List<DegradeRule> rules) {
try {
currentProperty.updateValue(rules);
} catch (Throwable e) {
RecordLog.error("[DegradeRuleManager] Unexpected error when loading degrade rules", e);
}
}
跟上面使用同样的方式,使用DynamicSentinelProperty,使用DegradeRuleManager.RulePropertyListener来监听配置的变化
2、 configUpdate();
@Override
public void configUpdate(List<DegradeRule> conf) {
//解析熔断规则
reloadFrom(conf);
RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: " + ruleMap);
}
3、 reloadFrom();
private synchronized void reloadFrom(List<DegradeRule> list) {
//构建熔断规则
Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());
//使用 HashSet 对规则去重
for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {
assert e.getValue() != null && !e.getValue().isEmpty();
Set<DegradeRule> rules = new HashSet<>(e.getValue().size());
for (CircuitBreaker cb : e.getValue()) {
rules.add(cb.getRule());
}
rm.put(e.getKey(), rules);
}
//缓存
DegradeRuleManager.circuitBreakers = cbs;
DegradeRuleManager.ruleMap = rm;
}
4、 buildCircuitBreakers();
private Map<String, List<CircuitBreaker>> buildCircuitBreakers(List<DegradeRule> list) {
Map<String, List<CircuitBreaker>> cbMap = new HashMap<>(8);
if (list == null || list.isEmpty()) {
return cbMap;
}
for (DegradeRule rule : list) {
//校验
if (!isValidRule(rule)) {
RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
//获取已经存在的熔断规则,或者创建一个新的
CircuitBreaker cb = getExistingSameCbOrNew(rule);
if (cb == null) {
RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: " + rule);
continue;
}
String resourceName = rule.getResource();
List<CircuitBreaker> cbList = cbMap.get(resourceName);
if (cbList == null) {
cbList = new ArrayList<>();
cbMap.put(resourceName, cbList);
}
cbList.add(cb);
}
return cbMap;
}
}
5、 getExistingSameCbOrNew();
private static CircuitBreaker getExistingSameCbOrNew(/*@Valid*/ DegradeRule rule) {
//获取缓存中该资源的熔断规则
List<CircuitBreaker> cbs = getCircuitBreakers(rule.getResource());
if (cbs == null || cbs.isEmpty()) {
//不存在时创建新的
return newCircuitBreakerFrom(rule);
}
for (CircuitBreaker cb : cbs) {
if (rule.equals(cb.getRule())) {
// Reuse the circuit breaker if the rule remains unchanged.
//返回相同的规则
return cb;
}
}
//不存在相同的时创建新的
return newCircuitBreakerFrom(rule);
}
6、 newCircuitBreakerFrom();
根据不同的熔断级别来创建熔断规则
private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
switch (rule.getGrade()) {
//慢比例
case RuleConstant.DEGRADE_GRADE_RT:
return new ResponseTimeCircuitBreaker(rule);
//异常比例
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
//异常数
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
return new ExceptionCircuitBreaker(rule);
default:
return null;
}
}
五、system 系统规则
1、 SystemRuleManager.loadRules(rules);
public static void loadRules(List<SystemRule> rules) {
currentProperty.updateValue(rules);
}
跟上面使用同样的方式,使用DynamicSentinelProperty,使用SystemPropertyListener来监听配置的变化
2、 SystemPropertyListener.configUpdate();
public synchronized void configUpdate(List<SystemRule> rules) {
//重置参数
restoreSetting();
// systemRules = rules;
if (rules != null && rules.size() >= 1) {
for (SystemRule rule : rules) {
//加载新的配置
loadSystemConf(rule);
}
} else {
checkSystemStatus.set(false);
}
RecordLog.info(String.format("[SystemRuleManager] Current system check status: %s, "
+ "highestSystemLoad: %e, "
+ "highestCpuUsage: %e, "
+ "maxRt: %d, "
+ "maxThread: %d, "
+ "maxQps: %e",
checkSystemStatus.get(),
highestSystemLoad,
highestCpuUsage,
maxRt,
maxThread,
qps));
}
3、 restoreSetting();
重置为默认值
protected void restoreSetting() {
checkSystemStatus.set(false);
// should restore changes
highestSystemLoad = Double.MAX_VALUE;
highestCpuUsage = Double.MAX_VALUE;
maxRt = Long.MAX_VALUE;
maxThread = Long.MAX_VALUE;
qps = Double.MAX_VALUE;
highestSystemLoadIsSet = false;
highestCpuUsageIsSet = false;
maxRtIsSet = false;
maxThreadIsSet = false;
qpsIsSet = false;
}
}
4、 loadSystemConf();
加载新的系统规则参数
public static void loadSystemConf(SystemRule rule) {
boolean checkStatus = false;
// Check if it's valid.
//系统负载
if (rule.getHighestSystemLoad() >= 0) {
highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
highestSystemLoadIsSet = true;
checkStatus = true;
}
//cpu使用率
if (rule.getHighestCpuUsage() >= 0) {
if (rule.getHighestCpuUsage() > 1) {
RecordLog.warn(String.format("[SystemRuleManager] Ignoring invalid SystemRule: "
+ "highestCpuUsage %.3f > 1", rule.getHighestCpuUsage()));
} else {
highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
highestCpuUsageIsSet = true;
checkStatus = true;
}
}
//平均响应时间
if (rule.getAvgRt() >= 0) {
maxRt = Math.min(maxRt, rule.getAvgRt());
maxRtIsSet = true;
checkStatus = true;
}
//最大线程数
if (rule.getMaxThread() >= 0) {
maxThread = Math.min(maxThread, rule.getMaxThread());
maxThreadIsSet = true;
checkStatus = true;
}
//qps
if (rule.getQps() >= 0) {
qps = Math.min(qps, rule.getQps());
qpsIsSet = true;
checkStatus = true;
}
checkSystemStatus.set(checkStatus);
}