(4) EurekaClient-timed task initialized at startup

(4) EurekaClient-timed task initialized at startup

0. Preface

  • springboot version: 2.1.9.RELEASE
  • springcloud version: Greenwich.SR4

1. Initialize a scheduled task

When the client starts, it will initialize the timing task. The method entry is mentioned in "EurekaClient-Pull Registry"

//DiscoveryClient.class DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { //...... try { //default size of 2-1 each for heartbeat and cacheRefresh //Initialize task thread pool, the size is 2 scheduler = Executors.newScheduledThreadPool( 2 , new ThreadFactoryBuilder() .setNameFormat( "DiscoveryClient-%d" ) .setDaemon( true ) .build()); //Initialize the heartbeat renewal timer heartbeatExecutor = new ThreadPoolExecutor( 1 , clientConfig.getHeartbeatExecutorThreadPoolSize(), 0 , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat( "DiscoveryClient-HeartbeatExecutor-%d" ) .setDaemon( true ) .build() ); //use direct handoff //Initialize refresh cache timer cacheRefreshExecutor = new ThreadPoolExecutor( 1 , clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0 , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat( "DiscoveryClient-CacheRefreshExecutor-%d" ) .setDaemon( true ) .build() ); //use direct handoff //...... } catch (Throwable e) { throw new RuntimeException( "Failed to initialize DiscoveryClient!" , e); } //...... //2 Initialize the timing task initScheduledTasks(); //...... } Copy code

2. initScheduledTasks()

//DiscoveryClient.class private void initScheduledTasks () { if (clientConfig.shouldFetchRegistry()) { //registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound( Enable regular refresh registration ); //Table tasks are executed every 30s by default. //The task execution time interval is not necessarily 30s. //When a certain execution fails to complete within the specified time interval and causes a timeout, then the next time interval is twice the previous time. //Use this By analogy, but the maximum time interval does not exceed registryFetchIntervalSeconds * expBackOffBound, the default is 30*10 //2.1 The above task execution mechanism is implemented in the TimedSupervisorTask class scheduler.schedule( new TimedSupervisorTask( "cacheRefresh" , scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, //3 refresh the registry thread new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info( "Starting heartbeat executor: " + "renew interval is: {}" , renewalIntervalInSecs); //Heartbeat timer //Start the regular heartbeat renewal task, which is executed every 30s by default. //The task execution mechanism is similar to the regular refresh registry task scheduler.schedule( new TimedSupervisorTask( "heartbeat" , scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, //4 Heartbeat renewal thread new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); //InstanceInfo replicator //5 Periodically detect client tasks instanceInfoReplicator = new InstanceInfoReplicator( this , instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2 ); //burstSize //Status change listener statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId () { return "statusChangeListener" ; } @Override public void notify (StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { //log at warn level if DOWN was involved logger.warn( "Saw local status change event {}" , statusChangeEvent); } else { logger.info( "Saw local status change event {}" , statusChangeEvent); } //When a state change event is released, the onDemandUpdate() method is triggered, the following analysis instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //Start timing detection client task instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info( "Not registering with Eureka server per configuration" ); } } Copy code

2.1 TimedSupervisorTask.class

TimedSupervisorTask execution task mechanism, which can flexibly control the execution time of the next task according to the specific situation

public class TimedSupervisorTask extends TimerTask { //...... @Override public void run () { Future<?> future = null ; try { //Use Future to submit a sub-thread task future = executor.submit(task); threadPoolLevelGauge.set(( long ) executor.getActiveCount()); //future.get is a blocking method, and returns the result only when the sub-thread task is completed //Call this method here and specify the waiting time timeoutMillis, the default is 30s //means the most Waiting for 30s, and throwing TimeoutException for more than 30s future.get(timeoutMillis, TimeUnit.MILLISECONDS); //block until done or timeout //The execution of this shows that the sub-thread task has not been executed over time. //Every time the task has not timed out, it will be restarted. Set delay delay.set(timeoutMillis); threadPoolLevelGauge.set(( long ) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { //Execution to this point indicates that the sub-thread task has not completed over time logger.warn( "task supervisor timed out" , e); timeoutCounter.increment(); //Get the current task time interval long currentDelay = delay.get(); //Calculate the next task time interval, the current task time interval*2, the maximum does not exceed maxDelay (default 30*10) long newDelay = Math.min(maxDelay , currentDelay * 2 ); //Set a new task interval delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn( "task supervisor shutting down, reject the task" , e); } else { logger.warn( "task supervisor rejected the task" , e); } rejectedCounter.increment(); } catch (Throwable e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn( "task supervisor shutting down, can't accept the task" ); } else { logger.warn( "task supervisor threw an exception" , e); } throwableCounter.increment(); } finally { if (future != null ) { //2.2 Cancel the task future.cancel( true ); } if (!scheduler.isShutdown()) { //As long as the executor does not stop, execute the task again scheduler.schedule( this , delay.get(), TimeUnit.MILLISECONDS); } } } } Copy code

2.2 future.cancel

When using Future, the task may have the following three states:

  • Waiting state: At this time, the call will be marked as canceled regardless of whether it is passed in true or false, and the task is still stored in the task queue, and it will be skipped directly when it is the turn of the task to run
  • Completion status: the call will not have any effect at this time, because the task has been completed
  • Running state: Passing true at this time will interrupt the task being executed, and passing false will not interrupt

3. Refresh the registry thread

//DiscoveryClient.class class CacheRefreshThread implements Runnable { public void run () { //Refresh the registry refreshRegistry(); } } @VisibleForTesting void refreshRegistry () { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false ; //This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if ( null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { //Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions), latestRemoteRegions) { String[] remoteRegions = latestRemoteRegions.split( "," ); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); //If the local remote region information is different from the latest one in the configuration file, the registry that needs to be pulled contains the remoteRegionsModified = true of the remote region ; } else { logger.info( "Remote regions to fetch modified concurrently," + "ignoring change from {} to {}" , currentRemoteRegions, latestRemoteRegions); } } } else { //Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } //Pull the registry boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } //...... } catch (Throwable e) { logger.error( "Cannot fetch registry from server" , e); } } Copy code

3.1 fetchRegistry()

//DiscoveryClient.class private boolean fetchRegistry ( boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { //If the delta is disabled or if it is the first time, get all //applications //Get the client local registry Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null ) || (applications.getRegisteredApplications().size() == 0 ) || (applications.getVersion() ==- 1 )) //Client application does not have latest library supporting delta { //...... //3.2 Pull the registry in full getAndStoreFullRegistry(); } else { //3.3 Pull the registry incrementally getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} -was unable to refresh its cache! status = {}" , appPathIdentifier, e.getMessage(), e); return false ; } finally { if (tracer != null ) { tracer.stop(); } } //Notify about cache refresh before updating the instance remote status onCacheRefreshed(); //Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); //registry was fetched successfully, so return true return true ; } Copy code

3.2 Pull the registration form in full

//DiscoveryClient.class private void getAndStoreFullRegistry () throws Throwable { //...... Applications apps = null ; //Initiate Jersey full pull registry request EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } //...... if (apps == null ) { //...... } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1 )) { //The server returns the registry information to filter chaos Put the localRegionApps localRegionApps.set( this .filterAndShuffle(apps)); //...... } else { //...... } } Copy code

3.3 Pull the registry incrementally

//DiscoveryClient.class private void getAndUpdateDelta (Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null ; //Initiate Jersey incremental pull registration request EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null ) { logger.warn( "The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry." ); //If delta is null, pull the full registry //Because it may be that the server does not allow the client to incrementally pull the registry. //If the server allows it, it cannot be null, it can be size = 0 getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1 )) { logger.debug( "Got delta update with apps hashcode {}" , delta.getAppsHashCode()); String reconcileHashCode = "" ; if (fetchRegistryUpdateLock.tryLock()) { try { //3.4 The registry information returned by the server is updated to the local updateDelta(delta); //Get the reconcileHashCode of the locally updated applications //Used to determine whether the data is lost reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn( "Cannot acquire update lock, aborting getAndUpdateDelta" ); } //There is a diff in number of instances for some reason //Compare the locally updated registry reconcileHashCode with the appsHashCode returned by the server //There may be inconsistencies, indicating that the data incrementally pulled from the server is lost // //First of all, the principle of incrementally pulling the registry is to get all the instance information that has changed from the recentlyChangedQueue on the server side.//RecentChangedQueue is a first-in first-out queue. The queue has a timed task on the server side, by default Executed every 30s //The timing task processing logic is to traverse the instance change information in the queue, and it will be stored in the recentlyChangedQueue after more than 3 minutes (default) to remove the queue. //If the client does not pull during the update of the local registry, The recentlyChangedQueue queue removed the instance change information that was not incrementally pulled by the client during this period. //Then there will be data loss // //The way for the client to confirm the data loss is to compare the registry after the local update If it is consistent with the hashCode returned by the server //hashCode, the data is not lost, and the inconsistency indicates that the data is lost if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { //3.5 Data loss needs to be repeated again Pull the registration form in full reconcileAndLogDifference(delta, reconcileHashCode); //this makes a remoteCall } } else { logger.warn( "Not updating application delta as another thread is updating it already" ); logger.debug( "Ignoring delta update with apps hashcode {}, as another thread is updating it already" , delta.getAppsHashCode()); } } Copy code

3.4 updateDelta()

The client locally not only maintains the local registry (saved instance of the same region as the local), but also maintains the registry remoteRegionVsApps of other remote regions

//DiscoveryClient.class private void updateDelta (Applications delta) { int deltaCount = 0 ; //Traverse the instance information in the service information in delta for (Application app: delta.getRegisteredApplications()) { for (InstanceInfo instance: app.getInstances ()) { //Get the local registry Applications applications = getApplications(); //Get the region where the current processing instance is located String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { //If it is not the region where the current client instance is located //then get Applications (a registry) from all remote region registries registered to the local based on the region where the current processing instance is located //Then the following operation is the registry of other remote regions Applications remoteApps = remoteRegionVsApps.get(instanceRegion); if ( null == remoteApps) { //If remoteApps is empty, create a new one and put it in remoteRegionVsApps remoteApps = new Applications(); remoteRegionVsApps.put(instanceRegion, remoteApps); } applications = remoteApps; } ++deltaCount; if (ActionType.ADDED.equals(instance.getActionType())) { //Instance change information is newly added (registered) Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null ) { //If the corresponding service information does not exist, add the current traversal service information to applications applications.addApplication(app); } logger.debug( "Added instance {} to the existing apps in region {}" , instance.getId(), instanceRegion); //Add instance information to the corresponding service information applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.MODIFIED.equals(instance.getActionType())) { //Instance change information is modified Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null ) { //If the corresponding service information does not exist, add the current traversal service information to applications applications.addApplication(app); } logger.debug( "Modified instance {} to the existing apps " , instance.getId()); //add the instance information to the corresponding service information applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.DELETED.equals(instance.getActionType())) { //Instance change information is deleted Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp != null ) { logger.debug( "Deleted instance {} to the existing apps " , instance.getId()); //If the corresponding service information exists, delete the instance information in the service information existingApp.removeInstance(instance); /* * We find all instance list from application(The status of instance status is not only the status is UP but also other status) * if instance list is empty, we remove the application. */ if (existingApp.getInstancesAsIsFromEureka().isEmpty()) { //If the corresponding service information is empty after deleting the instance information, the service information will also be deleted from applications applications.removeApplication(existingApp); } } } } } logger.debug( "The total number of instances fetched by the delta processor: {}" , deltaCount); getApplications().setVersion(delta.getVersion()); getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); for (Applications applications: remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } } Copy code

summary:

  • The client locally not only maintains the local registry (which saves the instance of the same region as the local), but also maintains the registry remoteRegionVsApps of other remote regions.
  • When updating the local registry logic above, when traversing each instance of each service, it distinguishes the partition where the instance is located at the beginning, and distinguishes whether to update the local registry or the registry of other remote regions.

3.5 reconcileAndLogDifference()

//DiscoveryClient.class private void reconcileAndLogDifference (Applications delta, String reconcileHashCode) throws Throwable { logger.debug( "The Reconcile hashcodes do not match, client: {}, server: {}. Getting the full registry" , reconcileHashCode, delta.getAppsHashCode()); RECONCILE_HASH_CODES_MISMATCH.increment(); long currentUpdateGeneration = fetchRegistryGeneration.get(); //Initiate Jersey full pull registry request EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); Applications serverApps = httpResponse.getEntity(); if (serverApps == null ) { logger.warn( "Cannot fetch full registry from the server; reconciliation failure" ); return ; } if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1 )) { //After filtering out of order, put it into the local registry localRegionApps.set( this .filterAndShuffle(serverApps)); getApplications().setVersion(delta.getVersion()); logger.debug( "The Reconcile hashcodes after complete sync up, client: {}, server: {}." , getApplications().getReconcileHashCode(), delta.getAppsHashCode()); } else { logger.warn( "Not setting the applications map as another thread has advanced the update generation" ); } } Copy code

4. Heartbeat renewal thread

//DiscoveryClient.class private class HeartbeatThread implements Runnable { public void run () { //The client heartbeat renews the lease if (renew()) { //If the heartbeat renewal is successful, record the time of the last successful heartbeat renewal lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } boolean renew () { EurekaHttpResponse<InstanceInfo> httpResponse; try { //initiate a Jersey heartbeat lease renewal request httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null ); logger.debug(PREFIX + "{} -Heartbeat status: {}" , appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { //If the server returns 404, the client initiates registration again //The reason for this situation is: //1. Because the lastDirtyTimestamp in the local instance information is greater than the lastDirtyTimestamp in the corresponding instance information record of the server registry, it means that the server is old //2 . There is no client instance information in the server registry REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} -Re -registering apps/{}" , appPathIdentifier, instanceInfo.getAppName()); //Local instance information setting lastDirtyTimestamp = current time and isInstanceInfoDirty = true long timestamp = instanceInfo.setIsDirtyWithTime() ; //Initiate registration to the server boolean success = register(); if (success) { //4.1 After successful registration, update the local isInstanceInfoDirty instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} -was unable to send heartbeat!" , appPathIdentifier, e); return false ; } } Copy code

4.1 instanceInfo.unsetIsDirty()

//InstanceInfo.class public synchronized void unsetIsDirty ( long unsetDirtyTimestamp) { //If lastDirtyTimestamp <= unsetDirtyTimestamp, only change isInstanceInfoDirty = false //Because of the heartbeat renewal on the client, the service ticket returns 404, and the registration is initiated again. After the success of this whole process //there may be other thread operations to change the lastDirtyTimestamp //If lastDirtyTimestamp> unsetDirtyTimestamp, it means that the instance information of the client is again inconsistent with the registry of the server//In the registry of the server The instance information of the client is still dirty, indicating that the instance information synchronized to the server during the whole process is still not the latest if (lastDirtyTimestamp <= unsetDirtyTimestamp) { isInstanceInfoDirty = false ; } else { } } Copy code

5. Regularly check the tasks of the client

class InstanceInfoReplicator implements Runnable { //...... InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) { this .discoveryClient = discoveryClient; this .instanceInfo = instanceInfo; //Initialize the thread pool, the size is 1 this .scheduler = Executors.newScheduledThreadPool( 1 , new ThreadFactoryBuilder() .setNameFormat( "DiscoveryClient-InstanceInfoReplicator-%d" ) .setDaemon( true ) .build()); this .scheduledPeriodicRef = new AtomicReference<Future>(); this .started = new AtomicBoolean( false ); this .rateLimiter = new RateLimiter(TimeUnit.MINUTES); this .replicationIntervalSeconds = replicationIntervalSeconds; this .burstSize = burstSize; //The default allowedRatePerMinute = 60 * 2/30 = 4, this .allowedRatePerMinute = 60 * this .burstSize/this .replicationIntervalSeconds; logger.info( "InstanceInfoReplicator onDemand update allowed rate per min is {}" , allowedRatePerMinute); } public void start ( int initialDelayMs) { if (started.compareAndSet( false , true )) { //Start when the task is not started//When the detection client task is started, the client may not be registered to the server (configuration file There is no configuration of mandatory registration at startup), so the dirty mark instanceInfo.setIsDirty(); //for initial register //Turn on the Future thread, put the task of the child thread, and execute Future next = scheduler.schedule( this , after 40s , initialDelayMs, TimeUnit.SECONDS); //Schedule the task thread that the executor needs to execute next time scheduledPeriodicRef.set(next); } } //...... public boolean onDemandUpdate () { //Token bucket mechanism, RateLimiter(TimeUnit.MINUTES), burstSize = 2, allowedRatePerMinute = 4 //Description bucket capacity is 2, every 15s (4 times per minute) ) Generate 1 token, and ideally get 4 tokens per minute at most. //Here, we use the token bucket mechanism to limit the current on-demand detection client tasks performed locally if (rateLimiter.acquire(burstSize, allowedRatePerMinute) )) { if (!scheduler.isShutdown()) { //Task executor is not stopped, then submit a new task thread scheduler.submit( new Runnable() { @Override public void run () { logger.debug( "Executing on-demand update of local InstanceInfo" ); //Get the asynchronous result of the last task execution Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug( "Canceling the latest scheduled update, it will be rescheduled at the end of on demand update" ); //If the last task is not completed, it will be canceled //When the parameter is false, there is analysis latestPeriodic.cancel above ( false ); } //Restart a new detection client task InstanceInfoReplicator. this .run(); } }); return true ; } else { logger.warn( "Ignoring onDemand update due to stopped scheduler" ); return false ; } } else { logger.warn( "Ignoring onDemand update due to rate limiter" ); return false ; } } public void run () { try { //5.1 Refresh the relevant information of the instance. If there is data update during the refresh process, record the latest modification time (dirty timestamp) of the client and mark it as dirty discoveryClient.refreshInstanceInfo(); //If isInstanceInfoDirty = true, it means that the local instance information is inconsistent with the server registry. //Then you need to initiate a registration with the server Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null ) { //initiate registration to the server discoveryClient.register(); //Update the local dirty mark instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn( "There was a problem with the instance info replicator" , t); } finally { //Open the Future thread, put in the sub-thread task, and execute the Future after an interval of 30s. Next = scheduler.schedule( this , replicationIntervalSeconds, TimeUnit.SECONDS); //Schedule the next task thread that the executor needs to execute scheduledPeriodicRef.set(next); } } } Copy code

5.1 discoveryClient.refreshInstanceInfo()

//DiscoveryClient.class void refreshInstanceInfo () { //5.1.1 Refresh data center information applicationInfoManager.refreshDataCenterInfoIfRequired(); //5.1.2 Refresh the heartbeat lease renewal information applicationInfoManager.refreshLeaseInfoIfRequired(); InstanceStatus status; try { //Check the health status, it is not turned on by default //When turned on, the client will be bound to the relevant middleware //The state of the bound middleware may affect the current client state status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); } catch (Exception e) { logger.warn( "Exception from healthcheckHandler.getStatus, setting status to DOWN" , e); //If an exception is thrown, the local status is set to DOWN status = InstanceStatus.DOWN; } if ( null != status) { applicationInfoManager.setInstanceStatus(status); } } Copy code

5.1.1 Refresh data center information

//ApplicationInfoManager.class public void refreshDataCenterInfoIfRequired () { //Get the host name that already exists locally String existingAddress = instanceInfo.getHostName(); String existingSpotInstanceAction = null ; //The default data center is MyOwn, which means custom if (instanceInfo.getDataCenterInfo() instanceof AmazonInfo) { //If it is Amazon, set existingSpotInstanceAction existingSpotInstanceAction = ((AmazonInfo) instanceInfo.getDataCenterInfo()).get(AmazonInfo.MetaDataKey.spotInstanceAction); } String newAddress; if (config instanceof RefreshableInstanceConfig) { //Refresh data center info, and return up to date address //If the Eureka instance is refreshable, refresh the relevant information of the data center and obtain the latest host name from the data center newAddress = (( RefreshableInstanceConfig) config).resolveDefaultAddress( true ); } else { //If the Eureka instance is not refreshable, the latest host name is the ipAddress or hostname configured in the local configuration file newAddress = config.getHostName( true ); } //Get the latest IP address from the local configuration file String newIp = config.getIpAddress(); if (newAddress != null && !newAddress.equals(existingAddress)) { logger.warn( "The address changed from: {} => {}" , existingAddress, newAddress); //5.1.1.1 If the latest host name is not empty and different from the existing host name, update the host name and ip address updateInstanceInfo(newAddress, newIp); } if (config.getDataCenterInfo() instanceof AmazonInfo) { //If the locally configured data center is Amazon, the existingSpotInstanceAction will be compared, and the corresponding information will be updated if they are inconsistent String newSpotInstanceAction = ((AmazonInfo) config.getDataCenterInfo()).get(AmazonInfo.MetaDataKey.spotInstanceAction); if (newSpotInstanceAction != null && !newSpotInstanceAction.equals(existingSpotInstanceAction)) { logger.info(String.format( "The spot instance termination action changed from: %s => %s" , existingSpotInstanceAction, newSpotInstanceAction)); updateInstanceInfo( null , null ); } } } Copy code

5.1.1.1 updateInstanceInfo()

//ApplicationInfoManager.class private void updateInstanceInfo (String newAddress, String newIp) { //:( in the legacy code here the builder is acting as a mutator. //This is hard to fix as this same instanceInfo instance is referenced elsewhere. //We will most likely re-write the client at sometime so not fixing for now. //Instance information will not be updated immediately, because instance information may be being used elsewhere at this time //An InstanceInfo.Builder will be created, Time to update the instance information InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo); if (newAddress != null ) { //Update the host name builder.setHostName(newAddress); } if (newIp != null ) { //Update the host name builder.setIPAddr(newIp); } //Update data center information builder.setDataCenterInfo(config.getDataCenterInfo()); //Mark dirty instanceInfo.setIsDirty(); } Copy code

5.1.2 Refresh the heartbeat lease renewal information

//ApplicationInfoManager.class public void refreshLeaseInfoIfRequired () { //Get local cache lease information //If the heartbeat interval and lease expiration interval are updated in the configuration, update to the local cache lease information LeaseInfo leaseInfo = instanceInfo.getLeaseInfo(); if (leaseInfo == null ) { return ; } int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds(); int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds(); if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(currentLeaseRenewal) .setDurationInSecs(currentLeaseDuration) .build(); instanceInfo.setLeaseInfo(newLeaseInfo); //Mark dirty instanceInfo.setIsDirty(); } } Copy code

7. Summary

  • There are 3 scheduled tasks to initialize when the client starts:

    • Refresh the registry: pull the registry from the server to the client
      • It is executed every 30s by default
      • The current task execution timeout (default 30s), the next task execution time interval doubles, no more than 300s (default)
      • The current task execution has not timed out (default 30s), the next task execution interval will return to normal (default 30s)
    • Heartbeat renewal: The client initiates a heartbeat to renew the lease to the server, telling the server its own information status, and the task execution mechanism is the same as refreshing the registry.
    • Check the client: When the relevant configuration changes, refresh the local data center information, refresh the lease renewal information, and perform a health check
      • It is executed every 30s by default
      • Can jump in line
  • The execution mode of all timed tasks (time interval or queue-insertion) is dynamically changed. The reason for satisfying the dynamic change is to start the next task every time the task is executed.