Nacos服务注册
概述
从源码的角度对以下几个过程进行分析:
- 客户端如何发起服务注册110.Nacos服务注册
- 服务端处理客户端服务注册请求
- 服务状态健康检查
- 高并发支撑海量服务注册
服务端
客户端注册流程
相关信息
本文基于Spring Cloud服务注册来进行说明。
客户端向Nacos服务端进行服务注册是通过监听WebServerInitializedEvent
事件进行触发,相关核心类结构如下图所示:
NacosAutoServiceRegistration
:该Bean会注册到上下文中,监听WebServerInitializedEvent事件
,最终调用NacosServiceRegistry#register
方法完成向Nacos注册。
WebServerInitializedEvent
是Spring Boot中的一个事件类,它在Web服务器(如Tomcat、Jetty等)初始化完成后触发。该事件是Spring Boot的一个应用程序事件,可以用于在应用程序启动后执行一些特定的操作。
接下来看下NacosServiceRegistry#register
执行流程,核心入口代码如下:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 实例分为临时实例和永久实例,如果是临时实例才进行健康检查
// 默认都是临时实例
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
// 定时任务进行心跳健康检查,每隔5秒执行一次
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
// 调用nacos接口进行注册服务
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
nacos实例分为两种:临时实例和永久实例。
对于临时实例,客户端会定时的通过 RPC 连接或http向 Nacos 注册中心发送心跳,保持连接的存活。如果客户端和注册中心的连接断开,那么注册中心会主动剔除该 client 所注册的服务,达到下线的效果。同时 Nacos 注册中心还会在注册中心启动时,注册⼀个过期客户端清除的定时任务, 用于删除那些健康状态超过⼀段时间的客户端。
对于永久实例的的健康检查,Nacos 采用的是注册中心探测机制,注册中心会在永久服务初始化时 根据客户端选择的协议类型注册探活的定时任务。Nacos 现在内置提供了三种探测的协议,即 Http、TCP 以及 MySQL,并且将无法探测成功的实例标记为不健康。
整体流程如下所示:
服务端处理服务注册请求流程
整体流程如下图所示:
- com.alibaba.nacos.naming.controllers.InstanceController#register方法接收用户请求,调用registerInstance方法
- 添加服务实例,发布同步客户端事件,异步订阅将客户端信息推送给nacos集群
- 发布ClientRegisterServiceEvent事件,订阅端将服务改变事件推送给该服务的订阅者列表。
- 发布InstanceMetadataEvent事件,异步订阅推送给订阅端。
注
服务端服务注册流程只分析了临时实例的注册流程,持久实例注册的相关逻辑在com.alibaba.nacos.naming.core.v2.service.impl.PersistentClientOperationServiceImp,本文不再详细说明。
服务下线流程
当客户端下线时,将会调用AbstractAutoServiceRegistration#destroy
执行下线逻辑,方法如下:
@PreDestroy
public void destroy() {
stop();
}
@PreDestroy
是Java中的一个注解,它标记的方法会在对象被销毁之前调用
stop方法主要由以下两个逻辑:
- 移除心跳任务
- 调用服务端的客户端下线接口
服务端客户端下线接口在InstanceRequestHandler#deregisterInstance
,通过 grpc调用。
@Override
public void deregisterInstance(Service service, Instance instance, String clientId) {
if (!ServiceManager.getInstance().containSingleton(service)) {
Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", service);
return;
}
Service singleton = ServiceManager.getInstance().getSingleton(service);
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
client.setLastUpdatedTime();
client.recalculateRevision();
if (null != removedInstance) {
NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(
new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));
}
}
主要做了以下操作:
- 移除服务实例相关信息,包括元数据、实例剔除、实例推送相关信息
- 如果是集群,同步到其他节点
一致性协议实现
Nacos支持AP和CP两种方式,CP基于Raft协议实现,AP使用自研Distro协议实现。
Distro协议
Distro 协议的主要设计思想如下:
- Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
- 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据一致性。
- 每个节点独立处理读请求,及时从本地发出响应。
实现逻辑
在类DistroProtocol
初始化时,会启动两个定时任务进行检查,分别是startVerifyTask
和startLoadTask
。
startVerifyTask流程
startVerifyTask
方法任务每隔5秒执行一次DistroVerifyTimedTask
//每隔5S执行一次
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
distroTaskEngineHolder.getExecuteWorkersManager()),
DistroConfig.getInstance().getVerifyIntervalMillis());
执行逻辑如下:
@Override
public void run() {
try {
//获取其他节点
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", targetServer);
}
//根据类型进行验证,类型为协议类型
for (String each : distroComponentHolder.getDataStorageTypes()) {
verifyForDataStorage(each, targetServer);
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
}
}
private void verifyForDataStorage(String type, List<Member> targetServer) {
//获取存储类
DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
if (!dataStorage.isFinishInitial()) {
Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
dataStorage.getClass().getSimpleName());
return;
}
//获取存储数据
List<DistroData> verifyData = dataStorage.getVerifyData();
if (null == verifyData || verifyData.isEmpty()) {
return;
}
for (Member member : targetServer) {
DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
if (null == agent) {
continue;
}
//通过执行器执行
executeTaskExecuteEngine.addTask(member.getAddress() + type,
new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
}
}
getVerifyData
代码如下:
public List<DistroData> getVerifyData() {
List<DistroData> result = null;
for (String each : clientManager.allClientId()) {
// 对每个本机所管理的注册客户端进行处理
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) {
// 空的或者是非临时性的节点,不处理
continue;
}
// 如果是自己管理的客户端
if (clientManager.isResponsibleClient(client)) {
// 需要验证的数据就是每个节点的clientId和revision
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(),
client.getRevision());
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
DistroData data = new DistroData(distroKey,
ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
data.setType(DataOperation.VERIFY);
if (result == null) {
result = new LinkedList<>();
}
result.add(data);
}
}
return result;
}
由DistroVerifyExecuteTask
处理待验证的数据,将自己负责的客户端信息通过RPC请求同步到其他节点。
public boolean syncVerifyData(DistroData verifyData, String targetServer) {
if (isNoExistTarget(targetServer)) {
// 本地节点的服务列表不包含目标服务,直接返回
return true;
}
// 将目标服务器替换为自身服务器,以便可以进行回调。
verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());
// 创建DistroDataRequest
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO
.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy, key: {}", targetServer,
verifyData.getDistroKey());
return false;
}
try {
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
} catch (NacosException e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! key: {} ", verifyData.getDistroKey(), e);
}
return false;
}
startLoadTask
@Override
public void run() {
try {
load();
if (!checkCompleted()) {
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
//获取所有协议类型,这里是GRPC
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
resourceType, transportAgent, dataProcessor);
return false;
}
for (Member each : memberManager.allMembersWithoutSelf()) {
long startTime = System.currentTimeMillis();
try {
// 获取其他节点的快照数据,处理
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
Loggers.DISTRO.info("[DISTRO-INIT] it took {} ms to load snapshot {} from {} and snapshot size is {}.",
System.currentTimeMillis() - startTime, resourceType, each.getAddress(),
getDistroDataLength(distroData));
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO
.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
result);
if (result) {
distroComponentHolder.findDataStorage(resourceType).finishInitial();
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}
startLoadTask
在启动的时候会同步其他节点的数据DistroVerifyExecuteTask
将自己所管理的客户端信息同步到其他节点
思考
Nacos服务注册如何支撑高并发?
Nacos服务注册采用内存队列进行注册,通过内存队列、线程池进行异步处理,提高并发量。接收到注册请求后将任务放进队列异步处理,同时将处理逻辑拆分异步处理。
Nacos如何防止多节点并发读写冲突?
- 2.x版本采用ConcurrentMap
- 每个节点负责一部分客户端的写操作,减少读写冲突