不灭的焱

革命尚未成功,同志仍须努力下载JDK17

作者:Albert.Wen  添加时间:2018-11-04 17:25:00  修改时间:2024-04-27 16:37:26  分类:Java基础  编辑

首先先进行服务的编织,将一个服务的一些基本信息编织成一个类,发送给注册中心,订阅者在注册中心取到的编织信息就可以调用该方法,这是整体的思路,我们在网络篇说过,所有的数据传输走RemotingTransporter,核心的传输主体要实现CommonCustomBody接口,接下来,我们就定义Provider发送给注册的类:

 

编织的服务信息基本信息应该有

1)服务的IP地址

2)端口号

3)服务名,这个应该是唯一的

4)是否是VIP服务,如果是VIP服务则当启动NettyServer需要在port-2的端口上监听

5)是否支持降级

6)降级服务的方法路径,这边降级做的比较简单,其实就是一个mock方法

7)降级服务的基本描述(其实并不是必要的,不过却可以用来做统计)

8)服务的权重,这个是很必要的,应该每个系统的线上实例肯定不止一台,而每一台实例的性能也不一样,有些服务器的性能好一点,内存大一点,可以设置大一点,最大100,最小1,这样在服务端调用该实例的时候,默认是使用加权随机负载均衡的算法,去随机访问服务提供端的

9)连接数,该连接数表示的是一个Consumer实例与一个Provider实例之间连接数,一般情况下,一个连接就够用了,特殊情况下,可以设置多个链接

 

好了,假如将上述的9个属性发送给注册中心,注册中心就可以确定唯一的服务了,但是一个简单的服务与生俱来是不具备这个属性的,但是如果你想让一个普通的服务例如上上节说的那个HelloServiceMock的服务有此属性的话,最最好用的方法就是让其绑定我们自定义的Annotation了,这样就可以很自然地付上这些属性了

 

Annotation定义很简单,与这个9个属性一一对应就可以了

package org.laopopo.client.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 
 * @author BazingaLyn
 * @description 服务提供端提供服务的annotation
 * @time 2016年8月19日
 * @modifytime
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD })
@Documented
public @interface RPCService {

	public String serviceName() default "";

	public int weight() default 50;

	public String responsibilityName() default "system";

	public int connCount() default 1;
	
	public boolean isVIPService() default false;
	
	public boolean isSupportDegradeService() default false;
	
	public String degradeServicePath() default "";
	
	public String degradeServiceDesc() default "";
	
	public long maxCallCountInMinute() default 10000;
	
}

 

注释就不加了,很容易理解,使用的时候就很简单了,如下:

 

package org.laopopo.example.demo.service;

import org.laopopo.client.annotation.RPCService;

/**
 * 
 * @author BazingaLyn
 * @description Demo
 * @time 2016年8月19日
 * @modifytime
 */
public class HelloSerivceImpl implements HelloSerivce {

	@Override
	@RPCService(responsibilityName="xiaoy",
				serviceName="LAOPOPO.TEST.SAYHELLO",
				isVIPService = false,
				isSupportDegradeService = true,
				degradeServicePath="org.laopopo.example.demo.service.HelloServiceMock",
				degradeServiceDesc="默认返回hello")
	public String sayHello(String str) {
		return "hello "+ str;
	}

}

 

这样做就可以使得这个方法绑定上这些特殊的属性了,接下来,我们只需要读取这些annotation的属性值,编织成注册信息类发送给注册中心,Provider端的功能就算完成了大半了

 

我们先看看如何使用启动Provider的某个实例,写个main函数:

package org.laopopo.example.generic.test_3;

import org.laopopo.client.provider.DefaultProvider;
import org.laopopo.common.exception.remoting.RemotingException;
import org.laopopo.example.demo.service.ByeServiceImpl;
import org.laopopo.example.demo.service.HelloSerivceImpl;
import org.laopopo.remoting.netty.NettyClientConfig;
import org.laopopo.remoting.netty.NettyServerConfig;

public class ProviderTest {
	
public static void main(String[] args) throws InterruptedException, RemotingException {
		
		DefaultProvider defaultProvider = new DefaultProvider(new NettyClientConfig(), new NettyServerConfig());
		
		
		defaultProvider.registryAddress("127.0.0.1:18010") //注册中心的地址
					   .monitorAddress("127.0.0.1:19010") //监控中心的地址
					   .serviceListenPort(8899) //暴露服务的地址
					   .publishService(new HelloSerivceImpl(),new ByeServiceImpl()) //暴露的服务
					   .start(); //启动服务
		
	}
	

}

 

DefaultProvider是Provider接口的具体的实现类,启动一个Provider最最规范的方式就是这样了,至少给其设定如上代码的几个参数:

1)注册中心地址,(其实可以没有,有一种场景就是服务消费者知道服务提供的地址,直接调用,不需要去注册中心上订阅,不过这也是在特殊的情况下使用)

2)监控中心的地址(也是非必要的,不过最好设置上)

3)服务暴露的端口号,这个是必要的,否则消费者如何去与你建立连接进行服务消费活动呢

4)要提供的服务,一切都准备好了,全部是这个服务准备的,如果你不设置你要提供的服务,不是逗别人玩吗

5)start方法启动整个Provider的实例

 

好了,我们整理一下整个流程,先讲上述5点钟必要的三点,暴露端口,提供服务,和start的方法,我们先看Provider.java这个最底层的接口类,我们上面讲过,服务提供者发布要提供的服务,需要调用Provider的publishService的方法

我们看着它的实现类,在DefaultProvider中,方法也很简单,将其用户传递的参数赋值给了全局变量obj

相同的,暴露的端口号也是赋值给DefaultProvider的全局变量exposePort的,所以真正去编制服务应该是在start方法中进行的

我们先关注编制的方法:

this.publishRemotingTransporters = providerController.getLocalServerWrapperManager().wrapperRegisterInfo(this.getExposePort(), this.obj);

我们看具体的实现类:

/**
 * 
 * @param listeningAddress 该服务暴露的网络地址 例如172.30.53.58::8989
 * @param controller 全局限流工具
 * @param obj 暴露的方法的实例
 * @return
 */
public List<RemotingTransporter> wrapperRegisterInfo(int port, Object... obj) {

	List<RemotingTransporter> remotingTransporters = new ArrayList<RemotingTransporter>();
	
	//基本判断,如果暴露的方法是null或者是0,则说明无需编织服务
	if (null != obj && obj.length > 0) {
		
		for (Object o : obj) {
			
			//默认的编织对象
			DefaultServiceWrapper defaultServiceWrapper = new DefaultServiceWrapper();
			
			List<ServiceWrapper> serviceWrappers = defaultServiceWrapper.provider(o).create();
			
			if(null != serviceWrappers  && !serviceWrappers.isEmpty()){
				for(ServiceWrapper serviceWrapper : serviceWrappers){
					
					PublishServiceCustomBody commonCustomHeader = new PublishServiceCustomBody();
					
					commonCustomHeader.setConnCount(serviceWrapper.getConnCount());
					commonCustomHeader.setDegradeServiceDesc(serviceWrapper.getDegradeServiceDesc());
					commonCustomHeader.setDegradeServicePath(serviceWrapper.getDegradeServicePath());
					commonCustomHeader.setPort(port);
					commonCustomHeader.setServiceProviderName(serviceWrapper.getServiceName());
					commonCustomHeader.setVIPService(serviceWrapper.isVIPService());
					commonCustomHeader.setWeight(serviceWrapper.getWeight());
					commonCustomHeader.setSupportDegradeService(serviceWrapper.isSupportDegradeService());
					commonCustomHeader.setMaxCallCountInMinute(serviceWrapper.getMaxCallCountInMinute());
					
					RemotingTransporter remotingTransporter =  RemotingTransporter.createRequestTransporter(LaopopoProtocol.PUBLISH_SERVICE, commonCustomHeader);
					remotingTransporters.add(remotingTransporter);
				}
			}
		}
	}
	return remotingTransporters;
	
}

可以很清晰的看到这个方法最最核心的方法就是编制了一个List<RemotingTransporter>这个对象,这个对象就是要发给注册中心的服务订阅类,也就是上面我们说的那9个属性,具体这个9个属性的编织是如下这个方法完成的

 

List<ServiceWrapper> serviceWrappers = defaultServiceWrapper.provider(o).create();

 

当我们编织List<RemotingTransporter>的时候,我们迭代了List<ServiceWrapper>这个对象而完成的,ServiceWrapper的实现也很简单:


 

 

我们接着关注defaultServiceWrapper.provider(o).create()这个方法,provider方法:

 

@Override
public ServiceWrapperWorker provider(Object serviceProvider) {
	//如果proxy的对象是null,实例对象无需编织,直接返回
		if(null  == globalProviderProxyHandler){
			this.serviceProvider = serviceProvider;
		}else{
		  Class<?> globalProxyCls = generateProviderProxyClass(globalProviderProxyHandler, serviceProvider.getClass());
	          this.serviceProvider = copyProviderProperties(serviceProvider, newInstance(globalProxyCls));
		}
	return this;
}

 

其实这个方法是把你要提供的服务做一个定制化的编织,跟Spring的AOP很类似,对类的功能进行了增强,可以完成更多更加强大的功能,加入我们不需要对其进行编织,就原样返回就可以了,详细的代码可以查看源码,这边不是关键

 

我们接着看create的方法

@Override
public List<ServiceWrapper> create() {
	
	List<ServiceWrapper> serviceWrappers = new ArrayList<ServiceWrapper>();
	
	//读取对象的方法注解
	RPCService rpcService = null;
	
	for (Class<?> cls = serviceProvider.getClass(); cls != Object.class; cls = cls.getSuperclass()) {
		Method[] methods = cls.getMethods();
		if(null != methods && methods.length > 0){
			
			for(Method method :methods){
				rpcService = method.getAnnotation(RPCService.class);
				if(null != rpcService){
					
					//服务名
					String serviceName = StringUtil.isNullOrEmpty(rpcService.serviceName())?method.getName():rpcService.serviceName();
					//负责人
					String responsiblityName = rpcService.responsibilityName();
					//方法weight
					Integer weight = rpcService.weight();
					//连接数 默认是1 一个实例一个1链接其实是够用的
					Integer connCount = rpcService.connCount();
					//是否支持服务降级
					boolean isSupportDegradeService = rpcService.isSupportDegradeService();
					//是否是VIP服务,如果是VIP服务,则默认是在port-2的端口暴露方法,与其他的方法使用不同的
					boolean isVIPService = rpcService.isVIPService();
					//暴露的降级方法的路径
					String degradeServicePath = rpcService.degradeServicePath();
					//降级方法的描述
					String degradeServiceDesc = rpcService.degradeServiceDesc();
					//每分钟调用的最大调用次数
					Long maxCallCount = rpcService.maxCallCountInMinute();
					if(maxCallCount <= 0){
						throw new RpcWrapperException("max call count must over zero at unit time");
					}
					ServiceFlowControllerManager serviceFlowControllerManager = providerController.getServiceFlowControllerManager();
					serviceFlowControllerManager.setServiceLimitVal(serviceName, maxCallCount);
					//如果是支持服务降级服务,则需要根据降级方法的路径去创建这个实例,并编制proxy
					if(isSupportDegradeService){
						Class<?> degradeClass = null;
						try {
							degradeClass = Class.forName(degradeServicePath);
							Object nativeObj = degradeClass.newInstance();
							if(null  == globalProviderProxyHandler){
								this.mockDegradeServiceProvider = nativeObj;
							}else{
								Class<?> globalProxyCls = generateProviderProxyClass(globalProviderProxyHandler, nativeObj.getClass());
					            this.mockDegradeServiceProvider = copyProviderProperties(nativeObj, newInstance(globalProxyCls));
							}
						} catch (Exception e) {
							logger.error("[{}] class can not create by reflect [{}]",degradeServicePath,e.getMessage());
							throw new RpcWrapperException("degradeService path " + degradeServicePath +"create failed" ); 
						} 
						
					}
					
					String methodName = method.getName();
					Class<?>[] classes = method.getParameterTypes();
					List<Class<?>[]> paramters = new ArrayList<Class<?>[]>();
					paramters.add(classes);
					
					ServiceWrapper serviceWrapper = new ServiceWrapper(serviceProvider,
											   mockDegradeServiceProvider,
											   serviceName,
										           responsiblityName,
											   methodName,
											   paramters,
											   isSupportDegradeService,
											   degradeServicePath,
											   degradeServiceDesc,
											   weight,
											   connCount,
											   isVIPService,
											   maxCallCount);
					//放入到一个缓存中,方便以后consumer来调取服务的时候,该来获取对应真正的编织类
					providerController.getProviderContainer().registerService(serviceName, serviceWrapper);
					
					serviceWrappers.add(serviceWrapper);
				}
			}
		}
	}
	return serviceWrappers;
}

 

这个就是如何编织服务的代码了,说来也是简单,因为已经获取到对象实例了,所以只需要使用简单的反射,获取到它方法级的annotation就可以很easy的获取到那9个属性了,当然当中还有一些其他的操作:

 

1)如果该方法可以降级,则根据mock的路径把降级的类初始化好

2)通过反射获取到方法名,因为很简单,不管你的服务名昵称serviceName多么的炫酷,追根究底消费者端仍旧需要method的名字然后反射去调用

3)为单位时间内限流做了初始化操作(限流下几个小节一起分析吧~)

 

好了,总之,服务就是在create方法中编织好的

 

发送编织好的服务就变得相对很简单了,在start方法中,有一个

 

try {
	// 发布任务
	this.publishedAndStartProvider();
	logger.info("provider start successfully");
} catch (Exception e) {
	logger.error("publish service to registry failed [{}]",e.getMessage());
}

 

这个方法就是发布服务到注册中心的方法,看看具体的实现,也很简单

 

public void publishedAndStartProvider() throws InterruptedException, RemotingException {

	// stack copy
	List<RemotingTransporter> transporters = defaultProvider.getPublishRemotingTransporters();
	
	if(null == transporters || transporters.isEmpty()){
		logger.warn("service is empty please call DefaultProvider #publishService method");
		return;
	}
	
	String address = defaultProvider.getRegistryAddress();

	if (address == null) {
        logger.warn("registry center address is empty please check your address");
        return;
	}
	String[] addresses = address.split(",");
	if (null != addresses && addresses.length > 0 ) {
		
		for (String eachAddress : addresses) {
			
			for (RemotingTransporter request : transporters) {
				
				pushPublishServiceToRegistry(request,eachAddress);

			}
		}
	}
}

 

获取到注册中心的地址,使用我们网络篇的一些API,就可以很方便地将其发送出去了,当然因为可能注册中心是多个,所以每一个provider实例需要将服务信息发送给每一个注册中心,每个注册中心收到服务消息的时候,都会反馈一个ACK,告诉provider端,服务已经注册成功,如果没有成功,则provider重新发送一次

 

private void pushPublishServiceToRegistry(RemotingTransporter request, String eachAddress) throws InterruptedException, RemotingException {
	logger.info("[{}] transporters matched", request);
	messagesNonAcks.put(request.getOpaque(), new MessageNonAck(request, eachAddress));
	RemotingTransporter remotingTransporter = defaultProvider.getNettyRemotingClient().invokeSync(eachAddress, request, 3000);
	if(null != remotingTransporter){
		AckCustomBody ackCustomBody = serializerImpl().readObject(remotingTransporter.bytes(), AckCustomBody.class);

		logger.info("received ack info [{}]", ackCustomBody);
		if(ackCustomBody.isSuccess()){
			messagesNonAcks.remove(ackCustomBody.getRequestId());
		}
	}else{
		logger.warn("registry center handler timeout");
	}
}

 

相关详细的代码可以查看源码:

https://github.com/BazingaLyn/laopopo-rpc/tree/master/laopopo-client/src/main/java/org/laopopo/client/provider

 

本节END~本小节简单的分析了Provider服务注册的过程~希望大家一起看看代码,发现错误的地方,我们一起纠正~

 

下一个小节,分析provider如何处理Consumer发送过来的远程调用的请求的~

 

 

摘自:https://blog.csdn.net/linuu/article/details/52526180