跳至主要內容

Nacos服务注册

xw大约 7 分钟NacosNacos

概述

从源码的角度对以下几个过程进行分析:

  • 客户端如何发起服务注册110.Nacos服务注册
  • 服务端处理客户端服务注册请求
  • 服务状态健康检查
  • 高并发支撑海量服务注册

服务端

客户端注册流程

相关信息

本文基于Spring Cloud服务注册来进行说明。

客户端向Nacos服务端进行服务注册是通过监听WebServerInitializedEvent事件进行触发,相关核心类结构如下图所示:

  • NacosAutoServiceRegistration:该Bean会注册到上下文中,监听WebServerInitializedEvent事件,最终调用NacosServiceRegistry#register方法完成向Nacos注册。

WebServerInitializedEvent是Spring Boot中的一个事件类,它在Web服务器(如Tomcat、Jetty等)初始化完成后触发。该事件是Spring Boot的一个应用程序事件,可以用于在应用程序启动后执行一些特定的操作。

接下来看下NacosServiceRegistry#register执行流程,核心入口代码如下:

@Override  
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {  
    // 实例分为临时实例和永久实例,如果是临时实例才进行健康检查
    // 默认都是临时实例
    if (instance.isEphemeral()) {  
        BeatInfo beatInfo = new BeatInfo();  
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));  
        beatInfo.setIp(instance.getIp());  
        beatInfo.setPort(instance.getPort());  
        beatInfo.setCluster(instance.getClusterName());  
        beatInfo.setWeight(instance.getWeight());  
        beatInfo.setMetadata(instance.getMetadata());  
        beatInfo.setScheduled(false);  
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());  
  

// 定时任务进行心跳健康检查,每隔5秒执行一次
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);  
    }  
  
// 调用nacos接口进行注册服务   
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);  
}

nacos实例分为两种:临时实例和永久实例。

对于临时实例,客户端会定时的通过 RPC 连接或http向 Nacos 注册中心发送心跳,保持连接的存活。如果客户端和注册中心的连接断开,那么注册中心会主动剔除该 client 所注册的服务,达到下线的效果。同时 Nacos 注册中心还会在注册中心启动时,注册⼀个过期客户端清除的定时任务, 用于删除那些健康状态超过⼀段时间的客户端。

对于永久实例的的健康检查,Nacos 采用的是注册中心探测机制,注册中心会在永久服务初始化时 根据客户端选择的协议类型注册探活的定时任务。Nacos 现在内置提供了三种探测的协议,即 Http、TCP 以及 MySQL,并且将无法探测成功的实例标记为不健康

整体流程如下所示:

服务端处理服务注册请求流程

整体流程如下图所示:

  • com.alibaba.nacos.naming.controllers.InstanceController#register方法接收用户请求,调用registerInstance方法
  • 添加服务实例,发布同步客户端事件,异步订阅将客户端信息推送给nacos集群
  • 发布ClientRegisterServiceEvent事件,订阅端将服务改变事件推送给该服务的订阅者列表。

  • 发布InstanceMetadataEvent事件,异步订阅推送给订阅端。

服务端服务注册流程只分析了临时实例的注册流程,持久实例注册的相关逻辑在com.alibaba.nacos.naming.core.v2.service.impl.PersistentClientOperationServiceImp,本文不再详细说明。

服务下线流程

当客户端下线时,将会调用AbstractAutoServiceRegistration#destroy执行下线逻辑,方法如下:

@PreDestroy  
public void destroy() {  
    stop();  
}

@PreDestroy 是Java中的一个注解,它标记的方法会在对象被销毁之前调用

stop方法主要由以下两个逻辑:

  • 移除心跳任务
  • 调用服务端的客户端下线接口

服务端客户端下线接口在InstanceRequestHandler#deregisterInstance,通过 grpc调用。

@Override
public void deregisterInstance(Service service, Instance instance, String clientId) {
    if (!ServiceManager.getInstance().containSingleton(service)) {
        Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", service);
        return;
    }
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
    client.setLastUpdatedTime();
    client.recalculateRevision();
    if (null != removedInstance) {
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
        NotifyCenter.publishEvent(
                new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));
    }
}

主要做了以下操作:

  • 移除服务实例相关信息,包括元数据、实例剔除、实例推送相关信息
  • 如果是集群,同步到其他节点

一致性协议实现

Nacos支持AP和CP两种方式,CP基于Raft协议实现,AP使用自研Distro协议实现。

Distro协议

Distro 协议的主要设计思想如下:

  • Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
  • 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据一致性。
  • 每个节点独立处理读请求,及时从本地发出响应。

实现逻辑

在类DistroProtocol初始化时,会启动两个定时任务进行检查,分别是startVerifyTaskstartLoadTask

startVerifyTask流程

startVerifyTask方法任务每隔5秒执行一次DistroVerifyTimedTask

//每隔5S执行一次 
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
                        distroTaskEngineHolder.getExecuteWorkersManager()),
                DistroConfig.getInstance().getVerifyIntervalMillis());

执行逻辑如下:

    @Override
    public void run() {
        try {
            //获取其他节点
            List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("server list is: {}", targetServer);
            }
            //根据类型进行验证,类型为协议类型
            for (String each : distroComponentHolder.getDataStorageTypes()) {
                verifyForDataStorage(each, targetServer);
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
        }
    }
    
    private void verifyForDataStorage(String type, List<Member> targetServer) {
        //获取存储类
        DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
        if (!dataStorage.isFinishInitial()) {
            Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
                    dataStorage.getClass().getSimpleName());
            return;
        }
        //获取存储数据
        List<DistroData> verifyData = dataStorage.getVerifyData();
        if (null == verifyData || verifyData.isEmpty()) {
            return;
        }
        for (Member member : targetServer) {
            DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
            if (null == agent) {
                continue;
            }
            //通过执行器执行
            executeTaskExecuteEngine.addTask(member.getAddress() + type,
                    new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
        }
    }

getVerifyData代码如下:

public List<DistroData> getVerifyData() {
    List<DistroData> result = null;
    for (String each : clientManager.allClientId()) {
        // 对每个本机所管理的注册客户端进行处理
        Client client = clientManager.getClient(each);
        if (null == client || !client.isEphemeral()) {
            // 空的或者是非临时性的节点,不处理
            continue;
        }
        // 如果是自己管理的客户端
        if (clientManager.isResponsibleClient(client)) {
            // 需要验证的数据就是每个节点的clientId和revision
            DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(),
                                                                           client.getRevision());
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            DistroData data = new DistroData(distroKey,
                                             ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
            data.setType(DataOperation.VERIFY);
            if (result == null) {
                result = new LinkedList<>();
            }
            result.add(data);
        }
    }
    return result;
}

DistroVerifyExecuteTask处理待验证的数据,将自己负责的客户端信息通过RPC请求同步到其他节点。

public boolean syncVerifyData(DistroData verifyData, String targetServer) {
    if (isNoExistTarget(targetServer)) {
        // 本地节点的服务列表不包含目标服务,直接返回
        return true;
    }
    // 将目标服务器替换为自身服务器,以便可以进行回调。
    verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());
    // 创建DistroDataRequest
    DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
    Member member = memberManager.find(targetServer);
    if (checkTargetServerStatusUnhealthy(member)) {
        Loggers.DISTRO
            .warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy, key: {}", targetServer,
                  verifyData.getDistroKey());
        return false;
    }
    try {
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        return checkResponse(response);
    } catch (NacosException e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! key: {} ", verifyData.getDistroKey(), e);
    }
    return false;
}

startLoadTask

@Override
public void run() {
    try {
        load();
        if (!checkCompleted()) {
            GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
        } else {
            loadCallback.onSuccess();
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
        }
    } catch (Exception e) {
        loadCallback.onFailed(e);
        Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
    }
}

private void load() throws Exception {
    while (memberManager.allMembersWithoutSelf().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
        TimeUnit.SECONDS.sleep(1);
    }
    while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
        TimeUnit.SECONDS.sleep(1);
    }
    //获取所有协议类型,这里是GRPC
    for (String each : distroComponentHolder.getDataStorageTypes()) {
        if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
            loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
        }
    }
}

private boolean loadAllDataSnapshotFromRemote(String resourceType) {
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == transportAgent || null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
                resourceType, transportAgent, dataProcessor);
        return false;
    }
    for (Member each : memberManager.allMembersWithoutSelf()) {
        long startTime = System.currentTimeMillis();
        try {
            // 获取其他节点的快照数据,处理
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
            DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
            Loggers.DISTRO.info("[DISTRO-INIT] it took {} ms to load snapshot {} from {} and snapshot size is {}.",
                    System.currentTimeMillis() - startTime, resourceType, each.getAddress(),
                    getDistroDataLength(distroData));
            boolean result = dataProcessor.processSnapshot(distroData);
            Loggers.DISTRO
                    .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
                            result);
            if (result) {
                distroComponentHolder.findDataStorage(resourceType).finishInitial();
                return true;
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
        }
    }
    return false;
}
  • startLoadTask在启动的时候会同步其他节点的数据
  • DistroVerifyExecuteTask将自己所管理的客户端信息同步到其他节点

思考

Nacos服务注册如何支撑高并发?
Nacos服务注册采用内存队列进行注册,通过内存队列、线程池进行异步处理,提高并发量。接收到注册请求后将任务放进队列异步处理,同时将处理逻辑拆分异步处理。

Nacos如何防止多节点并发读写冲突?

  • 2.x版本采用ConcurrentMap
  • 每个节点负责一部分客户端的写操作,减少读写冲突