PHP笔记网

革命尚未成功,同志仍须努力下载JDK17

作者:Albert.Wen  添加时间:2018-11-04 17:43:32  修改时间:2025-01-16 18:56:28  分类:07.Java基础  编辑

上一个小节简单的介绍了服务提供者端如何去编制一个服务的信息,然后将此服务的信息发送到注册中心上去的基本过程了,其实算是比较简单的,这节我们将简单的介绍一些Consumer端调用Provider端的时候,Provider端是如何处理的

 

我们先确定一下远程调用的几个参数:

首先先简单地说明这几个参数

1)invokeId,虽然在我这个RPC中并没有实现多大的价值,但这是我的能力的原因,但这个参数却是不可或缺的,因为有过远程调用使用经历的人,在线上出问题的时候,最最痛苦的就是定位问题了,而一般企业的业务都是链式调用,A系统调用B,B调用C,C调用D,E,往往排查问题的时候,某某接口调用不同的时候,都不知道哪里出问题了,需要一步步地找相关的负责人一一确认,这个过程是很痛苦的,很有可能是跨部门的,不好协调,一个问题一个bug能需要半天的时间才能解决,这就是普通RPC所缺少的链路监控的功能,加入在这种链式调用的场景下,所有的调用日记,能够按照这个invokeId做归类的话,不管是用ElasticSearch还是Hbase,Mysql等等记录方法,加上索引,有个监控系统,这样就可以很简单的找出问题的所在,是哪个环节出了问题了,这样可以大大的加快排查问题的速度,很多S级互联网公司都实现了这个功能,本人暂时没有研究过,不过现在已经有很多开源了,大家可以自主调研,一起学习

2)serviceName,这个很好理解,在上几个小节我们说过自定义Annotation绑定了某个具体的方法,所以一个serviceName是绑定一个方法的,获取到serviceName,我们可以确定唯一的方法,因为远程调用的本质还是调用某个方法

3)args,这个不用多说,调用方法的入参

4)timestamp调用的时间戳,这个时间应该在调用端的时候就形成了,一个远程调用的时间统计应该是从请求发出和接收到响应,这个时间应该算是一个完整的调用流程

 

我们看具体的调用代码:

public void handlerRPCRequest(RemotingTransporter request, Channel channel) {
		
		
	String serviceName = null;
	RequestCustomBody body = null;
	int requestSize = 0;

	try {
		byte[] bytes = request.bytes();
		requestSize = bytes.length;
		request.bytes(null);

		body = serializerImpl().readObject(bytes, RequestCustomBody.class);
		
		request.setCustomHeader(body);
		serviceName = body.getServiceName();
		
		
		ServiceMeterManager.incrementCallTimes(serviceName);
		ServiceMeterManager.incrementRequestSize(serviceName, requestSize);
		
	} catch (Exception e) {
		rejected(BAD_REQUEST, channel, request,serviceName);
		return;
	}
	
	final Pair<CurrentServiceState, ServiceWrapper> pair = defaultProvider.getProviderController().getProviderContainer().lookupService(serviceName);
	if (pair == null || pair.getValue() == null) {
        rejected(SERVICE_NOT_FOUND, channel, request,serviceName);
        return;
    }
	
	// app flow control
    ServiceFlowControllerManager serviceFlowControllerManager = defaultProvider.getProviderController().getServiceFlowControllerManager();
    if (!serviceFlowControllerManager.isAllow(serviceName)) {
        rejected(APP_FLOW_CONTROL,channel, request,serviceName);
        return;
    }
    
    process(pair,request,channel,serviceName,body.getTimestamp());
}

 

入参request中的bytes字节数就是真正的请求体,我们对其进行反序列化获取到真正的请求正文:

 

body = serializerImpl().readObject(bytes, RequestCustomBody.class);

 

反序列化得到的body里面有serviceName,我们再根据serviceName获取到真正的方法名:

 

final Pair<CurrentServiceState, ServiceWrapper> pair = defaultProvider.getProviderController().getProviderContainer().lookupService(serviceName);

 

我们在上个小节编织服务的时候,对ServiceWrapper进行了注入,知道serviceName与ServiceWrapper是一一对应的,lookupService方法也很简单:

 

 

在DefaultServiceProviderContainer.java中维护了一个全局变量的Map类,Key是serviceName,value是一个Pair的键值对,键值对的键值是CurrentServiceState.java

这个类表示的是当前服务实例,是对当前实例状态的说明:

 

value是我们有方法名的ServiceWrapper的类,这个类以前有过说明,这边做个简单的截图:

 

现在已经拿到了方法名,入参,接下来就是调用一些反射的API,就可以完成了方法的调用了:

 

private void process(Pair<CurrentServiceState, ServiceWrapper> pair, final RemotingTransporter request, Channel channel,final String serviceName,final long beginTime) {
		
	Object invokeResult = null;
	
	CurrentServiceState currentServiceState = pair.getKey();
	ServiceWrapper serviceWrapper = pair.getValue();
	
	Object targetCallObj = serviceWrapper.getServiceProvider();
	
	Object[] args = ((RequestCustomBody)request.getCustomHeader()).getArgs();
	
	if(currentServiceState.getHasDegrade().get() && serviceWrapper.getMockDegradeServiceProvider() != null){
		targetCallObj = serviceWrapper.getMockDegradeServiceProvider();
	}
	
	String methodName = serviceWrapper.getMethodName();
	List<Class<?>[]> parameterTypesList = serviceWrapper.getParamters();
	
	
	Class<?>[] parameterTypes = findMatchingParameterTypes(parameterTypesList, args);
	invokeResult = fastInvoke(targetCallObj, methodName, parameterTypes, args);
	
	ResultWrapper result = new ResultWrapper();
	result.setResult(invokeResult);
	ResponseCustomBody body = new ResponseCustomBody(Status.OK.value(), result);
	
	final RemotingTransporter response = RemotingTransporter.createResponseTransporter(LaopopoProtocol.RPC_RESPONSE, body, request.getOpaque());
	
	channel.writeAndFlush(response).addListener(new ChannelFutureListener() {

		public void operationComplete(ChannelFuture future) throws Exception {
			
			long elapsed = SystemClock.millisClock().now() - beginTime;
			
			logger.info("call time is [{}]  and minus [{}]",beginTime,elapsed);
			if (future.isSuccess()) {
				
				ServiceMeterManager.incrementTotalTime(serviceName, elapsed);
			} else {
				logger.info("request {} get failed response {}", request, response);
			}
		}
	});
	
}

 

这边的代码也很好理解,获取到参数,校验参数的格式的正确性,然后去调用一些反射和Cglib的一些API就可以搞定了

 

public static Object fastInvoke(Object obj, String methodName, Class<?>[] parameterTypes, Object[] args) {
    Class<?> clazz = obj.getClass();
    FastClass fastClass = fastClassCache.get(clazz);
    if (fastClass == null) {
        FastClass newFastClass = FastClass.create(clazz);
        fastClass = fastClassCache.putIfAbsent(clazz, newFastClass);
        if (fastClass == null) {
            fastClass = newFastClass;
        }
    }

    Object value = null;
    try {
        value = fastClass.invoke(methodName, parameterTypes, obj, args);
    } catch (InvocationTargetException e) {
        JUnsafe.throwException(e);
        
    }
    return value;
}

 

获取到调用的返回值之后,然后将其编织成返回值,然后用传递过来的channel将响应值返回到调用的Consumer的实例,整个流程基本是这样的,大体上的调用流程就是这样的,希望大家一起看代码,看看是否有bug,大家一起修正~

 

源码地址:

https://github.com/BazingaLyn/laopopo-rpc/blob/master/laopopo-client/src/main/java/org/laopopo/client/provider/ProviderRPCController.java

 

下一节,我们看看如何做简单的限流的

 

 

摘自:https://blog.csdn.net/linuu/article/details/52572678