Eureka-Server
eureka自动感知故障的任务,是在initEurekaServerContext()方法中调用的
protected void initEurekaServerContext() throws Exception {
// 省略......
// 启用自动感知故障调度任务
registry.openForTraffic(applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
进入openForTraffic(applicationInfoManager, registryCount)方法
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
//count 一般是不是为0的
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
// 省略......
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 服务实例摘除任务
super.postInit();
}
count不为0的解释如下
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
如果count==0,线程会被阻塞30s,最多会重试5次,eurekaClient.getApplications()获取的是本地服务,eureka服务会每隔30s主动抓取注册表,然后放到本地服务容器中。
进入postInit()方法
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
每隔60s触发一次服务实例下线的任务
进入类EvictionTask
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
getCompensationTimeMs()是计算一个补偿时间,进入long getCompensationTimeMs()方法
long getCompensationTimeMs() {
long currNanos = getCurrentTimeNano();
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0l) {
return 0l;
}
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
return compensationTime <= 0l ? 0l : compensationTime;
}
进入evict(long additionalLeaseMs)方法
public void evict(long additionalLeaseMs) {
// 省略......
// 筛选出过期的续约服务
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
// 计算最多摘除的服务实例
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// 公平洗牌算法
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
进入isExpired(long additionalLeaseMs)方法
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
可以看到lastUpdateTimestamp + duration,duration=90s,而lastUpdateTimestamp是在续约时候设置的
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
所以真正超时的时间是180s。
通过定时任务自动感知故障摘除服务实例不会在集群间同步,它应该通过抓取注册表来达到集群间的一致。
备注:eureka-client下线也可以通过调用DiscoveryClient的shutdown()方法,手动触发。
版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有
来源:https://blog.csdn.net/manongxx/category_10951792.html