EurekaServer集群机制源码解析
1、集群机制流程图
1、集群机制源码解析
1.1 集群信息初始化
eureka core的BootStrap里面,有一块代码,是PeerEurekaNodes的代码,其实是在处理eureka server集群信息的初始化,会执行PeerEurekaNodes.start()方法
解析配置文件中的其他eureka server的url地址,基于url地址构造一个一个的PeerEurekaNode,一个PeerEurekaNode就代表了一个eureka server。启动一个后台的线程,默认是每隔10分钟,会运行一个任务,就是基于配置文件中的url来刷新eureka server列表。
1.1.1 com.netflix.eureka.cluster.PeerEurekaNodes#start
public void start() {
// 1、初始化线程池
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 2、更新 eureka 节点信息
/**
* 2.1 解析 urls
* 2.2 然后根据 url 去构造 PeerEurekaNode(其实就是一个 Eureka Server)
*/
updatePeerEurekaNodes(resolvePeerUrls());
// 3、构建 eureka 节点信息的定时任务
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
// 间隔为 10 分钟
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: " + node.getServiceUrl());
}
}
1.1.2 com.netflix.eureka.cluster.PeerEurekaNodes#updatePeerEurekaNodes
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) {
// No change
return;
}
// 1、移除不可用的
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// 2、添加新的节点
// Add new peers
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
// 2.1 这里可以看到是创建新的 PeerEurekaNode
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
// 1、创建一个 HttpReplicationClient
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
// 2、基于 registry、targetHost、peerEurekaNodeUrl、replicationClient、serverConfig 来实例化 PeerEurekaNode。
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
// 看一下 PeerEurekaNode 的构造器
PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
// 1、基本的赋值
/** registry -> 注册表、
* targetHost -> 目标主机、
* replicationClient -> jerserclient用于处理请求
* serviceUrl -> 服务的 url
* config -> 服务端的配置
* maxProcessingDelayMs -> 复制延迟时间
* batcherName
*/
this.registry = registry;
this.targetHost = targetHost;
this.replicationClient = replicationClient;
this.serviceUrl = serviceUrl;
this.config = config;
this.maxProcessingDelayMs = config.getMaxTimeForReplication();
String batcherName = getBatcherName();
// 2、这个就是 replication 复制的具体的任务执行者,通过他去调度任务
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
// 3、创建 batchingDispatcher、nonBatchingDispatcher(批量处理、非批量处理的任务分发组件)
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
targetHost,
config.getMaxElementsInStatusReplicationPool(),
config.getMaxThreadsForStatusReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
1.2 集群信息拉取
registry.syncUp()
就是说,当前这个eureka server会从任何一个其他的eureka server拉取注册表过来放在自己本地,作为初始的注册表。将自己作为一个eureka client,找任意一个eureka server来拉取注册表,将拉取到的注册表放到自己本地去。
eurekaClient.getApplications();
eureka server自己本身本来就是个eureka client,在初始化的时候,就会去找任意的一个eureka server拉取注册表到自己本地来,把这个注册表放到自己身上来,作为自己这个eureka server的注册表
1.2.1 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp
/**
* 从对等的 Eureka 节点填充注册表信息。 如果通信失败,此操作将故障转移到其他节点,直到列表用完为止
*/
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
// 1、默认配置是 5(getRegistrySyncRetries)
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
// 2、不使用 Amazon 都返回 true.(Everything non-amazon is registrable.)
if (isRegisterable(instance)) {
// 3、然后就是调用 com.netflix.eureka.registry.AbstractInstanceRegistry#register 这个之前已经分析过了就不说了。
// 只不过这里 isReplication 参数为 true.就是标识此次是复制
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
1.3 注册、下线、故障、心跳
1、 如何从一台eureka server同步到另外一台eureka server上去的
2、ApplicationResource的addInstance()方法,负责注册,先在自己本地完成一个注册,接着会replicateToPeers()方法,这个方法就会将这次注册请求,同步到其他所有的eureka server上去.
3、如果是某台eureka client来找eureka server进行注册,isReplication是false,此时会给其他所有的你配置的eureka server都同步这个注册请求,此时一定会基于jersey,调用其他所有的eureka server的restful接口,去执行这个服务实例的注册的请求
4、eureka-core-jersey2的工程,ReplicationHttpClient,此时同步注册请求给其他eureka server的时候,一定会将isReplication设置为true,这个东西可以确保说什么呢,其他eureka server接到这个同步的请求,仅仅在自己本地执行,不会再次向其他的eureka server去进行注册
2、集群运行项目
2.1 EurekaServer 项目
2.1.1 pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bamboo</groupId>
<artifactId>eureka-server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>eureka-server</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.13.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Edgware.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.1.2 java 代码
@SpringBootApplication
@EnableEurekaServer
public class EurekaServer {
public static void main(String[] args) {
SpringApplication.run(EurekaServer.class, args);
}
}
2.1.3 配置文件
- peer1 配置
server:
port: 8761
eureka:
instance:
hostname: peer1
client:
serviceUrl:
defaultZone: http://peer2:8762/eureka/
- peer2 配置:
server:
port: 8762
eureka:
instance:
hostname: peer2
client:
serviceUrl:
defaultZone: http://peer1:8761/eureka/
启动的时候可以在 idea 中 copy 一个运行实例,然后在 vm options 中输入运行参数,默认会覆盖 yaml 中的配置
启动之后的状态如下图所示:
peer1:(8761 节点)
peer2:(8762 节点)
所以 2 者的同步是没有问题的,跟我们分析的一致。
版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有