对于RPC而言,服务的降级也是必不可少的,何为服务的降级,就是在业务洪流来的时候,服务器的压力陡增,数据库的压力也很大的时候,轻量化服务的功效,比如某个非核心服务需要调用数据库的,我们降级的服务不需要调用数据库,就比如我们在某某电商购物的时候,商品详情页的侧边栏一般会有电商推荐的一些比较类似的产品,这个后台的机制可能是某个推荐算法,根据用户浏览商品的记录给出推荐的产品,这是非核心的逻辑,这个功能在服务器的压力比较大的时候,可以进行降级的处理,我们可以给出几个默认的产品返回,因为推荐算法可能会设计大数据的计算和分析,甚至设计几次的数据库查询,在这个时候我们如果让这个后台方法默认返回几个固定的值的时候,可以减轻服务的压力,给其他的核心服务,例如支付,详情页等服务做出服务资源的让步
服务降级对于RPC来说是可以或缺的,因为对于RPC来说,远程调用是它的核心,但是服务降级对于服务治理是不可或缺的,实现服务降级的方式有很多种,本Demo RPC实现的方式比较简单,提供一个Mock方法,这个Mock方法由我们自己实现:
举例来说,对于一个简单的接口而言比如HelloService来说:
package org.laopopo.example.demo.service; /** * * @author BazingaLyn * @description * @time * @modifytime */ public interface HelloSerivce { String sayHello(String str); }
我们先给出他正常逻辑的实现:
package org.laopopo.example.demo.service; import org.laopopo.client.annotation.RPCService; /** * * @author BazingaLyn * @description Demo * @time 2016年8月19日 * @modifytime */ public class HelloSerivceImpl implements HelloSerivce { @Override @RPCService(responsibilityName="xiaoy", serviceName="LAOPOPO.TEST.SAYHELLO", isVIPService = false, isSupportDegradeService = true, degradeServicePath="org.laopopo.example.demo.service.HelloServiceMock", degradeServiceDesc="默认返回hello") public String sayHello(String str) { //真实逻辑可能涉及到查库 return "hello "+ str; } }
HelloServiceImpl.java这个类是我们一般的业务逻辑的实现,也许会设计到N次的库的查询,甚至还需要调用别人的接口,所以在业务洪流来的时候,这个服务占据的资源过多,但这个服务也非核心的,这时最好的方法就是降级,而不是熔断,服务的熔断也是服务治理的一个手段,我们言归正传,降级的方法就是我们写一个mock方法:
package org.laopopo.example.demo.service; public class HelloServiceMock implements HelloSerivce { @Override public String sayHello(String str) { //直接给出默认的返回值 return "hello"; } }
Mock方法就变得简单的多,这样就是一个服务降级的比较简单的实现,接下来就是处理如何实现服务切换的功能了
当服务器压力不大的时候,我们一般调用HelloServiceImpl的方法,当服务器的压力大的时候,或者服务调用的成功率低于某个值的时候,就开始切换服务的提供对象,由HelloServiceImpl切换到HelloServiceMock上来,关于切换,简单的又分两种,手动和自动:
1)手动切换也很好理解,比如明天双11,或者618,818之类的促销节到来的时候,提前人工把某些非核心的功能,切换到Mock方法上来
2)自动切换也很好理解,比如某些服务相对比较重要,如果直接手动降级也是比较可惜,我们可以设定假如某个服务器的服务调用成功率低于某个值的时候,就开始自动降级
我们看具体的实现:
我们在服务编织的时候,假如某个服务的Annotation上注明该服务有提供Mock类的时候,我们也进行简单的编织:
//如果是支持服务降级服务,则需要根据降级方法的路径去创建这个实例,并编制proxy if(isSupportDegradeService){ Class<?> degradeClass = null; try { degradeClass = Class.forName(degradeServicePath); Object nativeObj = degradeClass.newInstance(); if(null == globalProviderProxyHandler){ this.mockDegradeServiceProvider = nativeObj; }else{ Class<?> globalProxyCls = generateProviderProxyClass(globalProviderProxyHandler, nativeObj.getClass()); this.mockDegradeServiceProvider = copyProviderProperties(nativeObj, newInstance(globalProxyCls)); } } catch (Exception e) { logger.error("[{}] class can not create by reflect [{}]",degradeServicePath,e.getMessage()); throw new RpcWrapperException("degradeService path " + degradeServicePath +"create failed" ); } }
并将反射创建好的类与原对象一起编织成ServiceWrapper
ServiceWrapper serviceWrapper = new ServiceWrapper(serviceProvider, mockDegradeServiceProvider, serviceName, responsiblityName, methodName, paramters, isSupportDegradeService, degradeServicePath, degradeServiceDesc, weight, connCount, isVIPService, maxCallCount); //放入到一个缓存中,方便以后consumer来调取服务的时候,该来获取对应真正的编织类 providerController.getProviderContainer().registerService(serviceName, serviceWrapper);
我们在服务调用的章节中说过,我们为每一个服务都配置了一个服务的状态:
/** * * @author BazingaLyn * @description 当前实例的服务状态 * @time 2016年8月29日 * @modifytime */ public static class CurrentServiceState { private AtomicBoolean hasDegrade = new AtomicBoolean(false); // 是否已经降级 private AtomicBoolean hasLimitStream = new AtomicBoolean(true); // 是否已经限流 private AtomicBoolean isAutoDegrade = new AtomicBoolean(false); // 是否已经开始自动降级 private Integer minSuccecssRate = 90; // 服务最低的成功率,调用成功率低于多少开始自动降级 public AtomicBoolean getHasDegrade() { return hasDegrade; } public void setHasDegrade(AtomicBoolean hasDegrade) { this.hasDegrade = hasDegrade; } public AtomicBoolean getHasLimitStream() { return hasLimitStream; } public void setHasLimitStream(AtomicBoolean hasLimitStream) { this.hasLimitStream = hasLimitStream; } public AtomicBoolean getIsAutoDegrade() { return isAutoDegrade; } public void setIsAutoDegrade(AtomicBoolean isAutoDegrade) { this.isAutoDegrade = isAutoDegrade; } public Integer getMinSuccecssRate() { return minSuccecssRate; } public void setMinSuccecssRate(Integer minSuccecssRate) { this.minSuccecssRate = minSuccecssRate; } }
我们也维护了一个这样的全局变量:
private final ConcurrentMap<String, Pair<CurrentServiceState, ServiceWrapper>> serviceProviders = new ConcurrentHashMap<String, Pair<CurrentServiceState, ServiceWrapper>>();
Key是serviceName也就是服务名,Value是一个键值对,每一个服务的编织类,对应一个服务的状态
当调用的时候获取到每个服务的CurrentServiceState:
//判断服务是否已经被设定为自动降级,如果被设置为自动降级且有它自己的mock类的话,则将targetCallObj切换到mock方法上来 if(currentServiceState.getHasDegrade().get() && serviceWrapper.getMockDegradeServiceProvider() != null){ targetCallObj = serviceWrapper.getMockDegradeServiceProvider(); }
我们读取到CurrentServiceState中的是否已经降级的字段,如果是true,则将目标对象替换成Mock对象就OK了,其他的流程不变,这样就可以实现一个简单的人工降级了,我们只需要手动修改hasDegrade这个字段的值就可以完成服务的降级和恢复了
关于自动降级,其实就是在手动的基础上加一个定时轮询的检查就可以了,比如每个一分钟查询一下或者统计一下此时服务调用的成功率,如果低于某个临界值,例如成功率低于90%,则将hasDegrade字段修改成true
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { //检查是否有服务需要自动降级 DefaultProvider.this.providerController.checkAutoDegrade(); } }, 30, 60, TimeUnit.SECONDS); public void checkAutoDegrade() { //获取到所有需要降级的服务名 List<Pair<String, CurrentServiceState>> needDegradeServices = providerContainer.getNeedAutoDegradeService(); //如果当前实例需要降级的服务列表不为空的情况下,循环每个列表 if (!needDegradeServices.isEmpty()) { for (Pair<String, CurrentServiceState> pair : needDegradeServices) { //服务名 String serviceName = pair.getKey(); //最低成功率 Integer minSuccessRate = pair.getValue().getMinSuccecssRate(); //调用的实际成功率 Integer realSuccessRate = ServiceMeterManager.calcServiceSuccessRate(serviceName); if (minSuccessRate > realSuccessRate) { final Pair<CurrentServiceState, ServiceWrapper> _pair = this.defaultProvider.getProviderController().getProviderContainer() .lookupService(serviceName); CurrentServiceState currentServiceState = _pair.getKey(); if (!currentServiceState.getHasDegrade().get()) { currentServiceState.getHasDegrade().set(true); } } } } }
这样就可以完成了一个比较简单的服务降级了