对于远程调用来说,限流是很重要的,这是对自己的保护政策,因为为了保证在业务高峰期,线上系统也能保证一定的弹性和稳定性,最有效的方案就是进行服务降级了,而限流就是降级系统最常采用的方案之一
限流即流量限制,或者高大上一点,叫做流量整形,限流的目的是在遇到流量高峰期或者流量突增(流量尖刺)时,把流量速率限制在系统所能接受的合理范围之内,不至于让系统被高流量击垮。
目前有几种常见的限流方式:
1)通过限制单位时间段内调用量来限流
2)通过限制系统的并发调用程度来限流
3)使用漏桶(Leaky Bucket)算法来进行限流
4)使用令牌桶(Token Bucket)算法来进行限流
我们将做第一种最为简单的限流方式,限制单位时间段内调用量来限流
限流最难做的就是需要统计单位时间内调用的次数:
我们来看看在Java语言中,这种方式具体应该如何做,第一步我们需要做的就是确定这个单位时间段有多长,肯定不能太长,太长将会导致限流的效果变得不够“敏感”,因为我们知道,进入限流阶段后,如果采用的手段是不允许继续访问,那么在该单位时间段内,该服务是不可用的,比如我们把单位时间设置成1小时,如果在第29分钟,该服务的访问量就达到了我们设定的阈值,那么在接下来的31分钟,该服务都将变得不可用,这无形SO BAD!!如果单位时间段设置得太短,越短的单位时间段将导致我们的阈值越难设置,比如1秒钟,因为高峰期的1秒钟和低峰期的1秒钟的流量有可能相差百倍甚至千倍,同时过短的单位时间段也对限流代码片段提出了更高要求,限流部分的代码必须相当稳定并且高效!最优的单位时间片段应该以阈值设置的难易程度为标准,比如我们的监控系统统计的是服务每分钟的调用量,所以很自然我们可以选择1分钟作为时间片段,因为我们很容易评估每个服务在高峰期和低峰期的分钟调用量,并可以通过服务在每分钟的平均耗时和异常量来评估服务在不同单位时间段的服务质量,这给阈值的设置提供了很好的参考依据。当单位时间段和阈值已经确定,接下来就该考虑计数器的实现了,最快能想到的就是AtomicLong了,对于每次服务调用,我们可以通过AtomicLong#incrementAndGet()方法来给计数器加1并返回最新值,我们可以通过这个最新值和阈值来进行比较来看该服务单位时间段内是否超过了阈值。这里,如何设计计数器是个关键,假设单位时间段为1分钟,我们可以做一个环状结构的计数器,如下:
当然我们可以直接用一个数组来实现它:new AtomicLong[]{new AtomicLong(0), new AtomicLong(0), new AtomicLong(0)},当前分钟AtomicLong保存的是当前单位时间段内该服务的调用次数,上一分钟显然保存的是上一单位时间段的统计数据,之所以有这个是为了统计需要,既然到了当前的单位时间段,说明上一分钟的访问统计已经结束,即可将上一分钟的该接口的访问量数据打印日志或发送到某个服务端进行统计,因为我们说过,阈值的设置是个不断调优的过程,所以有时候这些统计数据会很有用。在对当前时间段的访问量进行统计的时候,需要将下一分钟的AtomicLong清零,这一步是很关键的,有两种清零方案:第一种,直接(通过Executors.newSingleThreadScheduledExecutor)起个单独线程,比如每隔50秒(这个当然必须小于单位时间段)对下一分钟的AtomicLong进行清零。第二种,每次在给当前分钟AtomicLong加1时,对下一分钟的AtomicLong的值进行检测,如果不是0,则设置成0,如果采用这种方案,这里会有一个bug,如果某个服务在某个完整的单位时间段内一次也没有被调用,则下一分钟的AtomicLong在使用前显然没有被清0,所以采用第二种方案还得通过额外的一个字段保存上一次清0的时间,每次使用当前分钟AtomicLong时,需要先判断这个字段,如果超过一个单位时间段,这则需要先清0再使用。两种方案对比来看,第一种方案实现起来更简单。对于如何访问当前分钟、上一分钟和下一分钟的AtomicLong,可以直接通过当前分钟数来对数组的length取模即可(比如获取当前分钟的数据index:(System.currentTimeMillis() / 60000) % 3)。
对于限制单位时间段内调用量的这种限流方式,实现简单,适用于大多数场景,如果阈值可以通过服务端来动态配置,甚至可以当做业务开关来使用,但也有一定的局限性,因为我们的阈值是通过分析单位时间段内调用量来设置的,如果它在单位时间段的前几秒就被流量突刺消耗完了,将导致该时间段内剩余的时间内该服务“拒绝服务”,可以将这种现象称为“突刺消耗”,但庆幸的是,这种情况并不常见。
(上面的文字抄与滴滴藏经阁)
我们来根据这个理论进行限流编码,首先先按照上述的理论实现:
public static class FlowController { private AtomicLong[] metricses = new AtomicLong[]{new AtomicLong(0), new AtomicLong(0), new AtomicLong(0)}; public void incrementAtCurrentMinute(){ long currentTime = SystemClock.millisClock().now(); int index = (int) ((currentTime / 60000) % 3); AtomicLong atomicLong = metricses[index]; atomicLong.incrementAndGet(); } public long getLastCallCountAtLastMinute(){ long currentTime = SystemClock.millisClock().now(); int index = (int) (((currentTime / 60000) - 1) % 3); AtomicLong atomicLong = metricses[index]; return atomicLong.get(); } public long getCurrentCallCount(){ long currentTime = SystemClock.millisClock().now(); int index = (int) (((currentTime / 60000)) % 3); AtomicLong atomicLong = metricses[index]; return atomicLong.get(); } public long getNextMinuteCallCount(){ long currentTime = SystemClock.millisClock().now(); int index = (int) (((currentTime / 60000) + 1) % 3); AtomicLong atomicLong = metricses[index]; return atomicLong.get(); } public void clearNextMinuteCallCount(){ System.out.println("清理开始"); long currentTime = SystemClock.millisClock().now(); int index = (int) (((currentTime / 60000) + 1) % 3); AtomicLong atomicLong = metricses[index]; atomicLong.set(0); } public AtomicLong[] getMetricses() { return metricses; } public void setMetricses(AtomicLong[] metricses) { this.metricses = metricses; } }
我们将其添加到我们自己限流的使用场景,某一个服务对应一个自己的限流器
private static Map<String,FlowController> serviceFlowController = new HashMap<String, FlowController>();
我们模仿调用的场景:
for(int i = 0;i <10000;i++){ callXXXXService("TEST.SERVICE"); Thread.sleep(20l); }
每隔20毫秒去调用一下某个服务:
private static void callXXXXService(String service) { FlowController controller = serviceFlowController.get(service); if(null == controller){ controller = new FlowController(); serviceFlowController.put(service, controller); } controller.incrementAtCurrentMinute(); //调用的核心逻辑处理 }
我们还需要有个定时任务,定时清理下一个槽位的已经统计的值
private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("provider-timer"));
我们每隔45秒清理一下下一个时间段的槽位
scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { System.out.println("清理下一个时间的槽位"); clearNextMinuteCallCount(); } catch (Exception e) { logger.warn("schedule publish failed [{}]", e.getMessage()); } } }, 3, 45, TimeUnit.SECONDS);
这样我们就完成了基本的限流功能,完整的代码示例:
package org.laopopo.example.flow.controller; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.laopopo.common.utils.NamedThreadFactory; import org.laopopo.common.utils.SystemClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PerMinuteFlowController { private static final Logger logger = LoggerFactory.getLogger(PerMinuteFlowController.class); private static Map<String,FlowController> serviceFlowController = new HashMap<String, FlowController>(); private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("provider-timer")); public static void main(String[] args) throws InterruptedException { Thread t = new Thread(new MetricsScanner(), "timeout.scanner"); t.setDaemon(true); t.start(); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { System.out.println("清理下一个时间的槽位"); clearNextMinuteCallCount(); } catch (Exception e) { logger.warn("schedule publish failed [{}]", e.getMessage()); } } }, 3, 45, TimeUnit.SECONDS); for(int i = 0;i <10000;i++){ callXXXXService("TEST.SERVICE"); Thread.sleep(20l); } } private static void clearNextMinuteCallCount() { for(String str : serviceFlowController.keySet()){ FlowController flowController = serviceFlowController.get(str); flowController.clearNextMinuteCallCount(); } } private static void callXXXXService(String service) { FlowController controller = serviceFlowController.get(service); if(null == controller){ controller = new FlowController(); serviceFlowController.put(service, controller); } controller.incrementAtCurrentMinute(); //调用的核心逻辑处理 } public static class FlowController { private AtomicLong[] metricses = new AtomicLong[]{new AtomicLong(0), new AtomicLong(0), new AtomicLong(0)}; public void incrementAtCurrentMinute(){ long currentTime = SystemClock.millisClock().now(); int index = (int) ((currentTime / 60000) % 3); AtomicLong atomicLong = metricses[index]; atomicLong.incrementAndGet(); } public long getLastCallCountAtLastMinute(){ long currentTime = SystemClock.millisClock().now(); int index = (int) (((currentTime / 60000) - 1) % 3); AtomicLong atomicLong = metricses[index]; return atomicLong.get(); } public long getCurrentCallCount(){ long currentTime = SystemClock.millisClock().now(); int index = (int) (((currentTime / 60000)) % 3); AtomicLong atomicLong = metricses[index]; return atomicLong.get(); } public long getNextMinuteCallCount(){ long currentTime = SystemClock.millisClock().now(); int index = (int) (((currentTime / 60000) + 1) % 3); AtomicLong atomicLong = metricses[index]; return atomicLong.get(); } public void clearNextMinuteCallCount(){ System.out.println("清理开始"); long currentTime = SystemClock.millisClock().now(); int index = (int) (((currentTime / 60000) + 1) % 3); AtomicLong atomicLong = metricses[index]; atomicLong.set(0); } public AtomicLong[] getMetricses() { return metricses; } public void setMetricses(AtomicLong[] metricses) { this.metricses = metricses; } } public static class MetricsScanner implements Runnable { @Override public void run() { for (;;) { logger.info("统计中"); try { Thread.sleep(5000); for(String str : serviceFlowController.keySet()){ FlowController flowController = serviceFlowController.get(str); logger.info("上一秒调用的次数是[{}]",flowController.getLastCallCountAtLastMinute()); logger.info("当前秒调用的次数是[{}]",flowController.getCurrentCallCount()); logger.info("下以秒调用的次数是[{}]",flowController.getNextMinuteCallCount()); } } catch (InterruptedException e) { e.printStackTrace(); } } } } }
运行截图:
过几十秒打印日记:
看打印日记成功了,实验基本上上成功了,我们可以做到统计每分钟的调用次数,并且可以随时得到上一分钟的调用次数的统计了~
接下来我们就需要将这段测试代码嵌入到我们实际的使用场景下去了,我们先理一下逻辑:
1)我们需要在编织服务的时候,用Annotation注明该服务实例的单位时间(分钟)最大的调用次数
2)在服务编织的时候,将其记录好,保存到某个全局变量中去,一个serviceName和一个最大的调用次数是一一对应的
3)当consumer实例端每调用一次,则限流器的次数加1
我们把限流的代码嵌入到RPC的Demo中去:
还记得我们在编织的那个小节中的自定义的Annotation中规定了某个具体的服务最大的单位时间调用次数,可以如下配置:
package org.laopopo.example.generic.test_5; import org.laopopo.client.annotation.RPCService; import org.laopopo.example.demo.service.HelloSerivce; public class HelloServiceFlowControllerImpl implements HelloSerivce { @Override @RPCService(responsibilityName="xiaoy",serviceName="LAOPOPO.TEST.SAYHELLO",maxCallCountInMinute = 40) public String sayHello(String str) { return "hello "+ str; } }
在编织服务的时候,将其记录到,全局限流管理器中去:
if(maxCallCount <= 0){ throw new RpcWrapperException("max call count must over zero at unit time"); } ServiceFlowControllerManager serviceFlowControllerManager = providerController.getServiceFlowControllerManager(); serviceFlowControllerManager.setServiceLimitVal(serviceName, maxCallCount);
ServiceFlowControllerManager.java
package org.laopopo.client.provider.flow.control; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.laopopo.common.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author BazingaLyn * @description 限制每个服务单位时间(分钟)的调用次数 * @time 2016年9月9日 * @modifytime */ public class ServiceFlowControllerManager { private static final Logger logger = LoggerFactory.getLogger(ServiceFlowControllerManager.class); private static final ConcurrentMap<String, Pair<Long,ServiceFlowController>> globalFlowControllerMap = new ConcurrentHashMap<String, Pair<Long,ServiceFlowController>>(); /** * 设置某个服务的单位时间的最大调用次数 * @param serviceName * @param maxCallCount */ public void setServiceLimitVal(String serviceName,Long maxCallCount){ Pair<Long,ServiceFlowController> pair = new Pair<Long, ServiceFlowController>(); pair.setKey(maxCallCount); pair.setValue(new ServiceFlowController()); globalFlowControllerMap.put(serviceName, pair); } /** * 原子增加某个服务的调用次数 * @param serviceName */ public void incrementCallCount(String serviceName){ Pair<Long,ServiceFlowController> pair = globalFlowControllerMap.get(serviceName); if(null == pair){ logger.warn("serviceName [{}] matched no flowController",serviceName); return; } ServiceFlowController serviceFlowController = pair.getValue(); serviceFlowController.incrementAtCurrentMinute(); } /** * 查看某个服务是否可用 * @param serviceName * @return */ public boolean isAllow(String serviceName){ Pair<Long,ServiceFlowController> pair = globalFlowControllerMap.get(serviceName); if(null == pair){ logger.warn("serviceName [{}] matched no flowController",serviceName); return false; } ServiceFlowController serviceFlowController = pair.getValue(); Long maxCallCount = pair.getKey(); long hasCallCount = serviceFlowController.incrementAtCurrentMinute(); serviceFlowController.incrementAtCurrentMinute(); return hasCallCount > maxCallCount ? false :true; } /** * 获取到某个服务的上一分钟的调用次数 * @param serviceName * @return */ public Long getLastMinuteCallCount(String serviceName){ Pair<Long,ServiceFlowController> pair = globalFlowControllerMap.get(serviceName); if(null == pair){ logger.warn("serviceName [{}] matched no flowController",serviceName); return 0l; } ServiceFlowController serviceFlowController = pair.getValue(); return serviceFlowController.getLastCallCountAtLastMinute(); } /** * 将下一秒的调用次数置为0 */ public void clearAllServiceNextMinuteCallCount(){ for(String service : globalFlowControllerMap.keySet()){ Pair<Long,ServiceFlowController> pair = globalFlowControllerMap.get(service); if(null == pair){ logger.warn("serviceName [{}] matched no flowController",service); continue; } ServiceFlowController serviceFlowController = pair.getValue(); serviceFlowController.clearNextMinuteCallCount(); } } }
然后在调用的远程调用的过程中加入一个简单的判断就可以了:
// app flow control ServiceFlowControllerManager serviceFlowControllerManager = defaultProvider.getProviderController().getServiceFlowControllerManager(); if (!serviceFlowControllerManager.isAllow(serviceName)) { rejected(APP_FLOW_CONTROL,channel, request,serviceName); return; }
以上代码就可以实现简单的限流控制了:
源码 PerMinuteFlowController.java
限流测试代码:
写的逻辑比较混乱,还请大家谅解,如果有错误和实现的错误的地方,还希望请大家指出,我改正,谢谢~