上一个小节简单的介绍了服务提供者端如何去编制一个服务的信息,然后将此服务的信息发送到注册中心上去的基本过程了,其实算是比较简单的,这节我们将简单的介绍一些Consumer端调用Provider端的时候,Provider端是如何处理的
我们先确定一下远程调用的几个参数:
首先先简单地说明这几个参数
1)invokeId,虽然在我这个RPC中并没有实现多大的价值,但这是我的能力的原因,但这个参数却是不可或缺的,因为有过远程调用使用经历的人,在线上出问题的时候,最最痛苦的就是定位问题了,而一般企业的业务都是链式调用,A系统调用B,B调用C,C调用D,E,往往排查问题的时候,某某接口调用不同的时候,都不知道哪里出问题了,需要一步步地找相关的负责人一一确认,这个过程是很痛苦的,很有可能是跨部门的,不好协调,一个问题一个bug能需要半天的时间才能解决,这就是普通RPC所缺少的链路监控的功能,加入在这种链式调用的场景下,所有的调用日记,能够按照这个invokeId做归类的话,不管是用ElasticSearch还是Hbase,Mysql等等记录方法,加上索引,有个监控系统,这样就可以很简单的找出问题的所在,是哪个环节出了问题了,这样可以大大的加快排查问题的速度,很多S级互联网公司都实现了这个功能,本人暂时没有研究过,不过现在已经有很多开源了,大家可以自主调研,一起学习
2)serviceName,这个很好理解,在上几个小节我们说过自定义Annotation绑定了某个具体的方法,所以一个serviceName是绑定一个方法的,获取到serviceName,我们可以确定唯一的方法,因为远程调用的本质还是调用某个方法
3)args,这个不用多说,调用方法的入参
4)timestamp调用的时间戳,这个时间应该在调用端的时候就形成了,一个远程调用的时间统计应该是从请求发出和接收到响应,这个时间应该算是一个完整的调用流程
我们看具体的调用代码:
public void handlerRPCRequest(RemotingTransporter request, Channel channel) { String serviceName = null; RequestCustomBody body = null; int requestSize = 0; try { byte[] bytes = request.bytes(); requestSize = bytes.length; request.bytes(null); body = serializerImpl().readObject(bytes, RequestCustomBody.class); request.setCustomHeader(body); serviceName = body.getServiceName(); ServiceMeterManager.incrementCallTimes(serviceName); ServiceMeterManager.incrementRequestSize(serviceName, requestSize); } catch (Exception e) { rejected(BAD_REQUEST, channel, request,serviceName); return; } final Pair<CurrentServiceState, ServiceWrapper> pair = defaultProvider.getProviderController().getProviderContainer().lookupService(serviceName); if (pair == null || pair.getValue() == null) { rejected(SERVICE_NOT_FOUND, channel, request,serviceName); return; } // app flow control ServiceFlowControllerManager serviceFlowControllerManager = defaultProvider.getProviderController().getServiceFlowControllerManager(); if (!serviceFlowControllerManager.isAllow(serviceName)) { rejected(APP_FLOW_CONTROL,channel, request,serviceName); return; } process(pair,request,channel,serviceName,body.getTimestamp()); }
入参request中的bytes字节数就是真正的请求体,我们对其进行反序列化获取到真正的请求正文:
body = serializerImpl().readObject(bytes, RequestCustomBody.class);
反序列化得到的body里面有serviceName,我们再根据serviceName获取到真正的方法名:
final Pair<CurrentServiceState, ServiceWrapper> pair = defaultProvider.getProviderController().getProviderContainer().lookupService(serviceName);
我们在上个小节编织服务的时候,对ServiceWrapper进行了注入,知道serviceName与ServiceWrapper是一一对应的,lookupService方法也很简单:
在DefaultServiceProviderContainer.java中维护了一个全局变量的Map类,Key是serviceName,value是一个Pair的键值对,键值对的键值是CurrentServiceState.java
这个类表示的是当前服务实例,是对当前实例状态的说明:
value是我们有方法名的ServiceWrapper的类,这个类以前有过说明,这边做个简单的截图:
现在已经拿到了方法名,入参,接下来就是调用一些反射的API,就可以完成了方法的调用了:
private void process(Pair<CurrentServiceState, ServiceWrapper> pair, final RemotingTransporter request, Channel channel,final String serviceName,final long beginTime) { Object invokeResult = null; CurrentServiceState currentServiceState = pair.getKey(); ServiceWrapper serviceWrapper = pair.getValue(); Object targetCallObj = serviceWrapper.getServiceProvider(); Object[] args = ((RequestCustomBody)request.getCustomHeader()).getArgs(); if(currentServiceState.getHasDegrade().get() && serviceWrapper.getMockDegradeServiceProvider() != null){ targetCallObj = serviceWrapper.getMockDegradeServiceProvider(); } String methodName = serviceWrapper.getMethodName(); List<Class<?>[]> parameterTypesList = serviceWrapper.getParamters(); Class<?>[] parameterTypes = findMatchingParameterTypes(parameterTypesList, args); invokeResult = fastInvoke(targetCallObj, methodName, parameterTypes, args); ResultWrapper result = new ResultWrapper(); result.setResult(invokeResult); ResponseCustomBody body = new ResponseCustomBody(Status.OK.value(), result); final RemotingTransporter response = RemotingTransporter.createResponseTransporter(LaopopoProtocol.RPC_RESPONSE, body, request.getOpaque()); channel.writeAndFlush(response).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { long elapsed = SystemClock.millisClock().now() - beginTime; logger.info("call time is [{}] and minus [{}]",beginTime,elapsed); if (future.isSuccess()) { ServiceMeterManager.incrementTotalTime(serviceName, elapsed); } else { logger.info("request {} get failed response {}", request, response); } } }); }
这边的代码也很好理解,获取到参数,校验参数的格式的正确性,然后去调用一些反射和Cglib的一些API就可以搞定了
public static Object fastInvoke(Object obj, String methodName, Class<?>[] parameterTypes, Object[] args) { Class<?> clazz = obj.getClass(); FastClass fastClass = fastClassCache.get(clazz); if (fastClass == null) { FastClass newFastClass = FastClass.create(clazz); fastClass = fastClassCache.putIfAbsent(clazz, newFastClass); if (fastClass == null) { fastClass = newFastClass; } } Object value = null; try { value = fastClass.invoke(methodName, parameterTypes, obj, args); } catch (InvocationTargetException e) { JUnsafe.throwException(e); } return value; }
获取到调用的返回值之后,然后将其编织成返回值,然后用传递过来的channel将响应值返回到调用的Consumer的实例,整个流程基本是这样的,大体上的调用流程就是这样的,希望大家一起看代码,看看是否有bug,大家一起修正~
源码地址:
下一节,我们看看如何做简单的限流的