publicclassLoadBalancerInterceptorimplementsClientHttpRequestInterceptor{privateLoadBalancerClientloadBalancer;// ...@OverridepublicClientHttpResponseintercept(finalHttpRequestrequest,finalbyte[]body,finalClientHttpRequestExecutionexecution)throwsIOException{finalURIoriginalUri=request.getURI();StringserviceName=originalUri.getHost();Assert.state(serviceName!=null,"Request URI does not contain a valid hostname: "+originalUri);returnthis.loadBalancer.execute(serviceName,this.requestFactory.createRequest(request,body,execution));}}
// RibbonLoadBalancerClient.javapublic<T>Texecute(StringserviceId,LoadBalancerRequest<T>request,Objecthint)throwsIOException{// 通过服务名获取到负载均衡器,ILoadBalancer 实现类为 DynamicServerListLoadBalancer,下文会提到ILoadBalancerloadBalancer=getLoadBalancer(serviceId);// 通过调用负载均衡器的 chooseServer 方法获取到服务器Serverserver=getServer(loadBalancer,hint);if(server==null){thrownewIllegalStateException("No instances available for "+serviceId);}RibbonServerribbonServer=newRibbonServer(serviceId,server,isSecure(server,serviceId),serverIntrospector(serviceId).getMetadata(server));returnexecute(serviceId,ribbonServer,request);}@Overridepublic<T>Texecute(StringserviceId,ServiceInstanceserviceInstance,LoadBalancerRequest<T>request)throwsIOException{Serverserver=null;if(serviceInstanceinstanceofRibbonServer){server=((RibbonServer)serviceInstance).getServer();}if(server==null){thrownewIllegalStateException("No instances available for "+serviceId);}RibbonLoadBalancerContextcontext=this.clientFactory.getLoadBalancerContext(serviceId);RibbonStatsRecorderstatsRecorder=newRibbonStatsRecorder(context,server);try{TreturnVal=request.apply(serviceInstance);statsRecorder.recordStats(returnVal);returnreturnVal;}// catch IOException and rethrow so RestTemplate behaves correctlycatch(IOExceptionex){statsRecorder.recordStats(ex);throwex;}catch(Exceptionex){statsRecorder.recordStats(ex);ReflectionUtils.rethrowRuntimeException(ex);}returnnull;}
// RibbonLoadBalancerClient.javaprotectedServergetServer(StringserviceId){// 通过 SpringClientFactory 获取 LoadBalancer 对象// 内部是通过反射,用构造方法构造一个实例对象returngetServer(getLoadBalancer(serviceId),null);}protectedServergetServer(ILoadBalancerloadBalancer){returngetServer(loadBalancer,null);}protectedServergetServer(ILoadBalancerloadBalancer,Objecthint){if(loadBalancer==null){returnnull;}// Use 'default' on a null hint, or just pass it on?// 通过 ILoadBalancer 的 chooseServer 方法获取 Server 对象returnloadBalancer.chooseServer(hint!=null?hint:"default");}
1 2 3 4 5 6 7 8 9101112
publicinterfaceILoadBalancer{// 添加一个 Server 集合publicvoidaddServers(List<Server>newServers);// 根据 key 获取 ServerpublicServerchooseServer(Objectkey);// 标记某个服务下线publicvoidmarkServerDown(Serverserver);// 获取可用的 Server 集合publicList<Server>getReachableServers();// 获取所有的 Server集合publicList<Server>getAllServers();}
// DynamicServerListLoadBalancer.java@VisibleForTestingpublicvoidupdateListOfServers(){List<T>servers=newArrayList<T>();if(serverListImpl!=null){servers=serverListImpl.getUpdatedListOfServers();LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",getIdentifier(),servers);if(filter!=null){// 如果配置了过滤器,则将符合条件的 server 筛选出来servers=filter.getFilteredListOfServers(servers);LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",getIdentifier(),servers);}}updateAllServerList(servers);}
// DiscoveryEnabledNIWSServerList.javaprivateList<DiscoveryEnabledServer>obtainServersViaDiscovery(){List<DiscoveryEnabledServer>serverList=newArrayList<DiscoveryEnabledServer>();if(eurekaClientProvider==null||eurekaClientProvider.get()==null){logger.warn("EurekaClient has not been initialized yet, returning an empty list");returnnewArrayList<DiscoveryEnabledServer>();}EurekaClienteurekaClient=eurekaClientProvider.get();if(vipAddresses!=null){for(StringvipAddress:vipAddresses.split(",")){// if targetRegion is null, it will be interpreted as the same region of clientList<InstanceInfo>listOfInstanceInfo=eurekaClient.getInstancesByVipAddress(vipAddress,isSecure,targetRegion);for(InstanceInfoii:listOfInstanceInfo){if(ii.getStatus().equals(InstanceStatus.UP)){if(shouldUseOverridePort){if(logger.isDebugEnabled()){logger.debug("Overriding port on client name: "+clientName+" to "+overridePort);}// copy is necessary since the InstanceInfo builder just uses the original reference,// and we don't want to corrupt the global eureka copy of the object which may be// used by other clients in our systemInstanceInfocopy=newInstanceInfo(ii);if(isSecure){ii=newInstanceInfo.Builder(copy).setSecurePort(overridePort).build();}else{ii=newInstanceInfo.Builder(copy).setPort(overridePort).build();}}DiscoveryEnabledServerdes=newDiscoveryEnabledServer(ii,isSecure,shouldUseIpAddr);des.setZone(DiscoveryClient.getZone(ii));serverList.add(des);}}if(serverList.size()>0&&prioritizeVipAddressBasedServers){break;// if the current vipAddress has servers, we dont use subsequent vipAddress based servers}}}returnserverList;}
// BaseLoadBalancer.java@OverridepublicvoidinitWithNiwsConfig(IClientConfigclientConfig){try{initWithNiwsConfig(clientConfig,ClientFactory::instantiateInstanceWithClientConfig);}catch(Exceptione){thrownewRuntimeException("Error initializing load balancer",e);}}@OverridepublicvoidinitWithNiwsConfig(IClientConfigclientConfig,Factoryfactory){S// ...try{// ...initWithConfig(clientConfig,rule,ping,stats);}catch(Exceptione){thrownewRuntimeException("Error initializing load balancer",e);}}voidinitWithConfig(IClientConfigclientConfig,IRulerule,IPingping,LoadBalancerStatsstats){// ...setPingInterval(pingIntervalTime);setMaxTotalPingTime(maxTotalPingTime);// cross associate with each other// i.e. Rule,Ping meet your container LB// LB, these are your Ping and Rule guys ...setRule(rule);setPing(ping);// ...}
publicvoidsetPingInterval(intpingIntervalSeconds){if(pingIntervalSeconds<1){return;}this.pingIntervalSeconds=pingIntervalSeconds;if(logger.isDebugEnabled()){logger.debug("LoadBalancer [{}]: pingIntervalSeconds set to {}",name,this.pingIntervalSeconds);}setupPingTask();// since ping data changed}
// BaseLoadBalancer.java// 内部类 PingTaskclassPingTaskextendsTimerTask{publicvoidrun(){try{newPinger(pingStrategy).runPinger();}catch(Exceptione){logger.error("LoadBalancer [{}]: Error pinging",name,e);}}}// 内部类 PingerclassPinger{privatefinalIPingStrategypingerStrategy;publicPinger(IPingStrategypingerStrategy){this.pingerStrategy=pingerStrategy;}publicvoidrunPinger()throwsException{// 用 CAS 设置 pingInProgress 为 true,代表正在执行 Ping 任务。// 如果设置失败,则表示有线程正在执行 Ping 任务,这里就不再执行if(!pingInProgress.compareAndSet(false,true)){return;// Ping in progress - nothing to do}// we are "in" - we get to PingServer[]allServers=null;boolean[]results=null;LockallLock=null;LockupLock=null;try{/* * The readLock should be free unless an addServer operation is * going on... */// 读锁应该是空闲状态,除了 addServer 操作正在执行。allLock=allServerLock.readLock();// 加读锁,避免其他线程修改 serverListallLock.lock();allServers=allServerList.toArray(newServer[allServerList.size()]);// 解锁allLock.unlock();intnumCandidates=allServers.length;// 向每个服务器发送 ping 请求,得到一个布尔值的结果集(服务器是否存活 - 能否请求成功)results=pingerStrategy.pingServers(ping,allServers);finalList<Server>newUpList=newArrayList<Server>();finalList<Server>changedServers=newArrayList<Server>();// 遍历当前所有Serverfor(inti=0;i<numCandidates;i++){// 获取第 i 个 Server 是当前否为存活状态(UP)booleanisAlive=results[i];Serversvr=allServers[i];// 获取 ping 之前的服务器状态booleanoldIsAlive=svr.isAlive();// 将该服务器状态改为当前获取到的状态svr.setAlive(isAlive);if(oldIsAlive!=isAlive){// 如果之前状态与当前获取的状态不一致// 加入到状态更改过的服务器列表中changedServers.add(svr);// 输出日志:当前服务器状态修改logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",name,svr.getId(),(isAlive?"ALIVE":"DEAD"));}if(isAlive){// 如果获取到的当前状态为 true(存活UP状态)// 则将该服务器加入到 newUpList 中,用于后面更新至存活服务器列表newUpList.add(svr);}}// 更新存活服务器列表,需要加写锁,避免并发问题upLock=upServerLock.writeLock();upLock.lock();upServerList=newUpList;upLock.unlock();// 通知服务器状态修改notifyServerStatusChangeListener(changedServers);}finally{pingInProgress.set(false);}}}
该定时任务的大致流程为:
1、首先将用 CAS 来修改 pingInProgress (AtomicBoolean 对象),如果修改不成功,则表示当前有其他线程正在发送 ping 请求,并且还没有执行完毕,所以当前操作可以不再执行。
2、加读锁获取当前服务列表。
3、通过 IPingStrategy 向每个服务器发送 ping 请求,得到一个布尔值的结果集。
3、遍历之前获取到的 Server List,判断服务状态是否有变化,并更新服务列表。(其中状态更改的服务器将会加入到 changedServers 列表中,ping 请求后依然存活的服务会加入到 newUpList 中)
4、最后加写锁更新 UP 状态的服务器列表,并通知服务状态改变。(有兴趣的话,此处可以查看一下ServerStatusChangeListener 的实现类是哪个,再看具体做了什么操作)