1-3 Eureka客户端源码解析

发布于 2022年 04月 08日 19:19

腾讯服务器

88 / 年

  • 上海/北京/广州...
  • 2核 2G 4M
  • Linux/Windows
新年大优惠

腾讯服务器

425 / 年

  • 上海/北京/广州...
  • 4核 8G 10M
  • Linux/Windows
年度最便宜

腾讯服务器

1249 / 年

  • 上海/北京/广州...
  • 8核 16G 14M
  • Linux/Windows
点击查看

一、客户端工作流程

1.1、初始化阶段

  • 1、读取与server交互的信息,封装成EurekaClientConfig
  • 2、读取自身服务信息,封装成EurekaInstanceConfig
  • 3、拉取Server注册信息缓存到本地
  • 4、服务注册
  • 5、发送心跳,刷新缓存,注册定时任务

1.2、启动阶段

  • 1、发送心跳维持租约
  • 2、定时获取注册表信息更新本地缓存
  • 3、监控自身信息,如果有变化重新注册服务

1.3、注销阶段

  • 从server端注销服务

二、配置信息类

  • 配置类结构如下图

2.1、EurekaDiscoveryClientConfiguration

  • 作用:帮助Client维持必要bean的属性读取和配置

2.1.1 读取属性和配置类

  • EurekaClientConfig:封装了与server交互的信息
  • ApplicationInfoManager:应用信息管理器,管理InstanceInfo,EurekaInstanceConfig
  • InstanceInfo:发送到server进行注册的元数据
  • EurekaInstanceConfig:自身服务实例的配置信息,用于构建InstanceInfo
  • DiscoverClient:用于服务发现的客户端接口

2.2、DiscoverClient

  • 说明:是服务端发现的核心接口
  • String description():获取实现类的描述
  • List getInstances(String serviceId):通过服务id获取服务信息
  • List getServices():获取服务id列表
package org.springframework.cloud.client.discovery;

public interface DiscoveryClient {

	String description();

	List<ServiceInstance> getInstances(String serviceId);

	List<String> getServices();

}
2.3、EurekaDiscoveryClient
  • 继承了DiscoveryClient,组合EurekaClient实现接口功能
package org.springframework.cloud.netflix.eureka;

public class EurekaDiscoveryClient implements DiscoveryClient {}

三、DiscoverClient类结构

  • 源码结构如下图

3.1、DiscoveryClient

  • 包名:com.netflix.discovery.DiscoveryClient
  • 说明:Client端与Server端交互关键逻辑

3.1.1、功能

  • 1、注册服务到Server
  • 2、发送心跳更新租约
  • 3、服务关闭时从Server中取消租约下线服务
  • 4、查询在Server中注册的服务实例列表

package com.netflix.discovery;

@Singleton
public class DiscoveryClient implements EurekaClient {}

3.1.2、类体系

LookupService---> EurekaClient ----> DiscoveryClient

3.2、LookupService

  • Application getApplication(String appName):通过服务名称获取实例信息
  • Applications getApplications():获取实例列表
  • List getInstancesById(String id):通过服务id获取服务信息
package com.netflix.discovery.shared;

public interface LookupService<T> {

    Application getApplication(String appName);

    Applications getApplications();

    List<InstanceInfo> getInstancesById(String id);

    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}

3.3、Application

  • Application中对InstanceInfo的操作都是同步
  • Applications里面的操作基本也是同步操作

3.3.1、Application所有属性

package com.netflix.discovery.shared;

@Serializer("com.netflix.discovery.converters.EntityBodyConverter")
@XStreamAlias("application")
@JsonRootName("application")
public class Application {
    
    private static Random shuffleRandom = new Random();
    private String name;
    
    private volatile boolean isDirty = false;
    private final Set<InstanceInfo> instances;
    private final AtomicReference<List<InstanceInfo>> shuffledInstances;
    private final Map<String, InstanceInfo> instancesMap;
}

3.3.2、Application操作同步

private void removeInstance(InstanceInfo i, boolean markAsDirty) {
        instancesMap.remove(i.getId());
        synchronized (instances) {
            instances.remove(i);
            if (markAsDirty) {
                isDirty = true;
            }
        }
    }

3.4、EurekaClient

  • 特点:继承了LookupService,为DiscoveryClient提供了上层接口,属于比较稳定的接口(扩展层)

3.4.1、作用

  • 提供了多种获取InstanceInfo接口
  • 提供了本地客户端数据
  • 提供了为客户端注册和获取健康检查处理器的能力

3.4.2、核心方法

  • 为Eureka Client注册健康检查处理器
  • 监听Client服务实例信息的更新
    //为Eureka Client注册健康检查处理器
    public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
    //监听Client服务实例信息的更新
    public void registerEventListener(EurekaEventListener eventListener);

3.5、HealthCheckHandler

  • 用于检查当前Client的状态,如果Client的姿态发生改变,将会触发新的注册事件
  • 该事件属于观察者模式,事件监听器将监听Client的服务实例信息变化,触发对应的处理事件
public interface HealthCheckHandler {

    InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus);

}

四、、DiscoverClient 源码解读

4.1、DiscoverClient构造方法

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) 

4.1.1、构造方法参数

参数说明

  • ApplicationInfoManager:应用信息管理器
  • EurekaClientConfig:Client与Server交互配置信息
  • AbstractDiscoveryClientOptionalArgs:注入可选参数
  • Provider:用于获取注册表信息

4.1.2、构造方法主要操作

  • 从Server中拉取注册表信息,服务信息,初始化发送心跳,刷新缓存,按需注册定时任务
  • 1、初始化阶段:基础信息初始化,配置信息初始化,线程池初始化
  • 2、构建阶段:构建EurekaTransport
  • 3、预处理阶段:拉取Server注册表信息,注册前预处理
  • 4、注册阶段:向Server中注册,初始化心跳定时任务(线程池2个线程)
  • 心跳地址:localhost:8761/eureka/EUREKA-CLIENT/apps/AppName/instanceInfoId
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
    //1、参数说明
    //ApplicationInfoManager:应用信息管理器
    //EurekaClientConfig:Client与Server交互信息
    //AbstractDiscoveryClientOptionalArgs:可选参数
    //Provider<BackupRegistry>:注册中心备份
    
    
    //2、初始化信息
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }

        this.backupRegistryProvider = backupRegistryProvider;

        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());

        fetchRegistryGeneration = new AtomicLong(0);

        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));


    //3、配置信息读取初始化
    //shouldFetchRegistry对应配置:eureka.client.fetch-register=true/false(是否从Server中拉取注册表信息)
    //shouldRegisterWithEureka对应配置:eureka.client.register-with-eureka=true/false(是否将自身信息注册到Server)
       
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", 
            new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", 
            new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());

            return;  // no need to setup up an network tasks and we are done
        }

    //4、线程池定义 
    //scheduler:线程池大小为2
    //heartbeatExecutor:心跳发送线程
    //cacheRefreshExecutor:缓存刷新线程

        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

    //5、构建EurekaTransport
    //EurekaTransport是DiscoverClient内部类,封装了Client与Server进行http调用的Jersey客户端

            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

    //6、拉取注册表信息:先拉取Server注册表信息,并缓存到本地,减少与Server端通讯
    //shouldFetchRegistry()=true,并且没有注册过
    
    
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

    //7、注册服务:注册之前先调用预注册功能
        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }

    //8、开始注册
    //开始注册:register() 
    //初始化定时任务:initScheduledTasks();
    
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }

4.2、DiscoverClient 拉取注册表信息

4.2.1、拉取注册表信息

  • 方法:boolean fetchRegistry(boolean forceFullRegistryFetch);
  • 1、判断拉取方式全量拉取或增量拉取:增量方式被禁止,或Application为空时用全量拉取(一般为第一次拉取)
  • 2、拉取信息
  • 3、计算集合一致性哈希码
  • 4、更新远程实例数据
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            Applications applications = getApplications();

    //1、判断拉取方式
    //如果增量方式被禁止,或Application为空,采用全量拉取方式
    //getAndUpdateDelta:全量拉取
    //getAndUpdateDelta:增量拉取
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                getAndStoreFullRegistry();
            } else {
                getAndUpdateDelta(applications);
            }
            
    //2、计算集合一致性哈希码
    
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

    //3、更新远程实例数据
    // onCacheRefreshed:推送缓存刷新事件
    //updateInstanceRemoteStatus:缓存中被刷新数据更新远程实例数据
    
        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }

4.2.2、全量拉取信息

  • getAndStoreFullRegistry()
  • 从Server中拉取的信息封装在Applications,并通过处理替换本地注册缓存Applications
  • 全量拉取方法有可能被多个线程调用,产生脏数据,因此提供了增量拉取数据
拉取操作
    private void getAndStoreFullRegistry() throws Throwable {
    
    //1、获取注册表的版本号,防止版本落后(由线程引起)
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
                
    //2、信息获取成功
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

    //3、判断信息
    //检查fetchRegistryGeneration的版本更新是否有改变,无改变说明是最新数据
    //有个数据更新:从app中选出状态为UP的实例,同时打乱实例顺序,防止同一个服务不同的实例在启动时接受流量
    
        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }

4.2.3、增量拉取信息

  • 地址:http://localhost:8761/eureka/apps/delta
  • 1、获取版本号
  • 2、获取数据失败:调用全量拉取方式再拉取数据
  • 3、获取数据成功:分享本地缓存,计算哈希码,如果哈希码不一致则为脏数据,继续调用全量拉取
    private void getAndUpdateDelta(Applications applications) throws Throwable {
    
    //1、获取版本号
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }

    //2、数据获取
    //数据获取失败:调用全量拉取方法拉取数据
    //拉取成功:更新本地缓存,计算集合一致性哈希码,如果哈希码不一致认为本次数据为脏数据,继续采用全量拉取信息
    
        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
            
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                
    //更新缓存数据
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }

4.2.3、更新缓存数据

4.2.3.1、数据枚举类型
  • ADDED:数据添加
  • MODIFIED:数据改变
  • DELETED:数据删除
    
    public enum ActionType {
        ADDED, // Added in the discovery server
        MODIFIED, // Changed in the discovery server
        DELETED// Deleted from the discovery server
    }

4.2.3.1、更新缓存
  • 遍历列表数据,将添加和修改的数据添加本地注册表中,将删除类型的数据从本地注册表删除
    //缓存数据更新:遍历列表数据,将添加和修改的数据添加本地注册表中,将删除类型的数据从本地注册表删除
    private void updateDelta(Applications delta) {
        int deltaCount = 0;
        for (Application app : delta.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                Applications applications = getApplications();
                String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                    Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                    if (null == remoteApps) {
                        remoteApps = new Applications();
                        remoteRegionVsApps.put(instanceRegion, remoteApps);
                    }
                    applications = remoteApps;
                }

                ++deltaCount;
                if (ActionType.ADDED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    
        //将添加类型数据直接添加到本地注册表
        
                    logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    
        //将修改类型数据直接添加到本地注册表
        
                    logger.debug("Modified instance {} to the existing apps ", instance.getId());

                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

                } else if (ActionType.DELETED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    
        //将删除类型的数据从本地注册表删除
        
                    logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                    applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
                }
            }
        }
        logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);

        getApplications().setVersion(delta.getVersion());
        getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());

        for (Applications applications : remoteRegionVsApps.values()) {
            applications.setVersion(delta.getVersion());
            applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
        }
    }

4.3、DiscoverClient 服务注册

  • 数据封装:InstanceInfo
  • 注册成功返回状态码204
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
     
    //注册服务:将数据封装到InstanceInfo
    
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

4.4、DiscoverClient 初始化定时任务

4.4.1、初始化定时任务方法:initScheduledTasks

  • 1、注册缓存定时任务,默认刷新时间30秒
  • 2、注册发送心跳定时任务,默认时间30秒
  • 3、注册定时器
  • 4、添加监听器来监听应用状态改变,并在状态改变时重新注册
  • 5、启动定时任务
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
        
    //1、注册表缓存刷新时间:默认30秒
    //通过eureka.client.registry-fetch-interval-seconds 设置
    
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

    //2、发送心跳定时器:默认30秒发送一次
    
        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

    //3、注册定时器
    
            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

    //注册应用状态改变监控器
    
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

    //启动定时任务
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

4.4.2、刷新缓存定时任务

4.4.2.1、TimedSupervisorTask定时任务
  • TimedSupervisorTask继承了TimerTask提供定时任务功能,主要运行在run方法中
任务调度过程:

1、scheduler初始化并延迟执行TimedSupervisorTask 2、TimedSupervisorTask将task提交到executor中执行,task和executor在初始化TimedSupervisorTask时传入 3、task正常执行,TimedSupervisorTask将自己提交到scheduler,延迟delay时间后再次执行 4、task超时执行,计算新的delay时间TimedSupervisorTask将自己提交到scheduler,延迟delay时间后再次执行

public class TimedSupervisorTask extends TimerTask {
    private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);

    private final Counter timeoutCounter;
    private final Counter rejectedCounter;
    private final Counter throwableCounter;
    private final LongGauge threadPoolLevelGauge;

    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor executor;
    private final long timeoutMillis;
    private final Runnable task;

    private final AtomicLong delay;
    private final long maxDelay;

    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;

        // Initialize the counters and register.
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
    }

    @Override
    public void run() {
        Future<?> future = null;
        try {
        
    //执行任务:submit
    
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
    
    //等待执行任务结果
    
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            
    //执行完成,设置下次任务执行频率(时间间隔)
            
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
        } catch (TimeoutException e) {
        
    //任务超时,设置下次任务执行频率
    
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();

            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
        
    //任务拒绝,统计拒绝任务次数
        
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }

            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }

            throwableCounter.increment();
        } finally {
        
        
    //取消未结束的任务
    
            if (future != null) {
                future.cancel(true);
            }
    
    //如果定时任务未关闭,定义下一次任务
    
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
}
4.4.2.2、缓存线程:CacheRefreshThread
  • CacheRefreshThread为发送心跳定时任务线程
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

4.4.2.3、 缓存刷新核心方法:refreshRegistry
  • 判断Region是否改变(Server地址),用于决定全量拉取还是增量拉取
  • 打印更新注册表缓存后变化

    @VisibleForTesting
    void refreshRegistry() {
        try {
            boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

            boolean remoteRegionsModified = false;
            // This makes sure that a dynamic change to remote regions to fetch is honored.
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
                String currentRemoteRegions = remoteRegionsToFetch.get();
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                    // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                    synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                        if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                            String[] remoteRegions = latestRemoteRegions.split(",");
                            remoteRegionsRef.set(remoteRegions);
                            instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                            remoteRegionsModified = true;
                        } else {
                            logger.info("Remote regions to fetch modified concurrently," +
                                    " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                        }
                    }
                } else {
                    // Just refresh mapping to reflect any DNS/Property change
                    instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                }
            }
            
    //判断Region是否改变(Server地址),用于决定全量拉取还是增量拉取
    
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }

    //打印更新注册表缓存后变化
    
            if (logger.isDebugEnabled()) {
                StringBuilder allAppsHashCodes = new StringBuilder();
                allAppsHashCodes.append("Local region apps hashcode: ");
                allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                allAppsHashCodes.append(", is fetching remote regions? ");
                allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                    allAppsHashCodes.append(", Remote region: ");
                    allAppsHashCodes.append(entry.getKey());
                    allAppsHashCodes.append(" , apps hashcode: ");
                    allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                }
                logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                        allAppsHashCodes);
            }
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }        
    }

4.4.3、发送心跳定时任务

4.4.3.1、心跳定时任务线程:HeartbeatThread
  • 任务的作用:向Server发送心跳请求,维持Client在注册表中的租约
    private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

4.4.3.2、心跳发送核心方法:renew()
  • 续约核心参数:appName,InstanceId

  • 1、调用HTTP发送心跳到Server端

  • 2、如果请求状态码为404表示Server中不存在当前实例,线程会重新调用注册方法进行注册

  • 3、如果请求状态码为200表示续约成功

    
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
        
    //调用HTTP发现心跳到Server:主要参数-appName,InstanceId
    
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            
    //Server中不存在当前实例为404,线程查重新注册
            
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            
    //续约成功返回200
            
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

4.4.4、按需注册定时任务

  • 按需注册分为两种场景
  • 一个场景为:定义定时任务,定时刷新服务数据和状态,在数据或状态改变时向Server发起重新注册
  • 一个场景为:注册状态监听器,当状态改变时向Server发起重新注册
4.4.4.1、定义定时任务
  • 作用:当Client中的InstanceInfo或status发生变化时重新向Server发起注册,更新实例信息表保证Server中的信息可用
    private void initScheduledTasks() {
        ... ... 
    
    //检查InstanceInfo数据是否变化,有变化重新注册
    
            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

    //监听应用状态改变,状态改变后重新发起注册
    
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
4.4.4.2、线程重新注册
  • 按需注册2、注册状态改变的监听器,当状态改变后重新注册应用
class InstanceInfoReplicator implements Runnable {

    InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
        this.discoveryClient = discoveryClient;
        this.instanceInfo = instanceInfo;
    
    
    
    //创建定时任务
        
        this.scheduler = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
                        .setDaemon(true)
                        .build());
    
        this.scheduledPeriodicRef = new AtomicReference<Future>();
    
        this.started = new AtomicBoolean(false);
        this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
        this.replicationIntervalSeconds = replicationIntervalSeconds;
        this.burstSize = burstSize;
    
        this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
        logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
    }
        
    public void run() {
        try {
        
    //刷新InstanceInfo中服务实例信息
    
            discoveryClient.refreshInstanceInfo();

    //如果数据发生改变,返回数据更新时间
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
            
    //注册实例信息,重新更新状态
            
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
        
    //执行下一个延时任务
        
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
}
4.4.4.3、Client信息更新及状态检查
  • 更新服务下线,租约下线
  • 检查服务状态呢变化
    void refreshInstanceInfo() {
    
    //更新新服务信息
        applicationInfoManager.refreshDataCenterInfoIfRequired();
    //更新租约信息
        applicationInfoManager.refreshLeaseInfoIfRequired();

        InstanceStatus status;
        try {
    
    //调用getHealthCheckHandler检查服务实例变化
    
            status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
        } catch (Exception e) {
            logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
            status = InstanceStatus.DOWN;
        }

        if (null != status) {
            applicationInfoManager.setInstanceStatus(status);
        }
    }
    

4.5、DiscoverClient 服务下线

  • 主要操作:注销监听器,取消定时任务,服务下线,关闭Jersy客户端,关闭相关Monitor(阈值数据)
  • 在线程同步的方法中执行
    @PreDestroy
    @Override
    public synchronized void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");

    //注销监听器
    
            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }

    //取消定时任务
    
            cancelScheduledTasks();

    //服务下线
    
            // If APPINFO was registered
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                unregister();
            }

    //关闭Jersy客户端
    
            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }
    //关闭相关Monitor
            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();

            logger.info("Completed shut down of DiscoveryClient");
        }
    }

推荐文章