首先先进行服务的编织,将一个服务的一些基本信息编织成一个类,发送给注册中心,订阅者在注册中心取到的编织信息就可以调用该方法,这是整体的思路,我们在网络篇说过,所有的数据传输走RemotingTransporter,核心的传输主体要实现CommonCustomBody接口,接下来,我们就定义Provider发送给注册的类:
编织的服务信息基本信息应该有
1)服务的IP地址
2)端口号
3)服务名,这个应该是唯一的
4)是否是VIP服务,如果是VIP服务则当启动NettyServer需要在port-2的端口上监听
5)是否支持降级
6)降级服务的方法路径,这边降级做的比较简单,其实就是一个mock方法
7)降级服务的基本描述(其实并不是必要的,不过却可以用来做统计)
8)服务的权重,这个是很必要的,应该每个系统的线上实例肯定不止一台,而每一台实例的性能也不一样,有些服务器的性能好一点,内存大一点,可以设置大一点,最大100,最小1,这样在服务端调用该实例的时候,默认是使用加权随机负载均衡的算法,去随机访问服务提供端的
9)连接数,该连接数表示的是一个Consumer实例与一个Provider实例之间连接数,一般情况下,一个连接就够用了,特殊情况下,可以设置多个链接
好了,假如将上述的9个属性发送给注册中心,注册中心就可以确定唯一的服务了,但是一个简单的服务与生俱来是不具备这个属性的,但是如果你想让一个普通的服务例如上上节说的那个HelloServiceMock的服务有此属性的话,最最好用的方法就是让其绑定我们自定义的Annotation了,这样就可以很自然地付上这些属性了
Annotation定义很简单,与这个9个属性一一对应就可以了
package org.laopopo.client.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * * @author BazingaLyn * @description 服务提供端提供服务的annotation * @time 2016年8月19日 * @modifytime */ @Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.METHOD }) @Documented public @interface RPCService { public String serviceName() default ""; public int weight() default 50; public String responsibilityName() default "system"; public int connCount() default 1; public boolean isVIPService() default false; public boolean isSupportDegradeService() default false; public String degradeServicePath() default ""; public String degradeServiceDesc() default ""; public long maxCallCountInMinute() default 10000; }
注释就不加了,很容易理解,使用的时候就很简单了,如下:
package org.laopopo.example.demo.service; import org.laopopo.client.annotation.RPCService; /** * * @author BazingaLyn * @description Demo * @time 2016年8月19日 * @modifytime */ public class HelloSerivceImpl implements HelloSerivce { @Override @RPCService(responsibilityName="xiaoy", serviceName="LAOPOPO.TEST.SAYHELLO", isVIPService = false, isSupportDegradeService = true, degradeServicePath="org.laopopo.example.demo.service.HelloServiceMock", degradeServiceDesc="默认返回hello") public String sayHello(String str) { return "hello "+ str; } }
这样做就可以使得这个方法绑定上这些特殊的属性了,接下来,我们只需要读取这些annotation的属性值,编织成注册信息类发送给注册中心,Provider端的功能就算完成了大半了
我们先看看如何使用启动Provider的某个实例,写个main函数:
package org.laopopo.example.generic.test_3; import org.laopopo.client.provider.DefaultProvider; import org.laopopo.common.exception.remoting.RemotingException; import org.laopopo.example.demo.service.ByeServiceImpl; import org.laopopo.example.demo.service.HelloSerivceImpl; import org.laopopo.remoting.netty.NettyClientConfig; import org.laopopo.remoting.netty.NettyServerConfig; public class ProviderTest { public static void main(String[] args) throws InterruptedException, RemotingException { DefaultProvider defaultProvider = new DefaultProvider(new NettyClientConfig(), new NettyServerConfig()); defaultProvider.registryAddress("127.0.0.1:18010") //注册中心的地址 .monitorAddress("127.0.0.1:19010") //监控中心的地址 .serviceListenPort(8899) //暴露服务的地址 .publishService(new HelloSerivceImpl(),new ByeServiceImpl()) //暴露的服务 .start(); //启动服务 } }
DefaultProvider是Provider接口的具体的实现类,启动一个Provider最最规范的方式就是这样了,至少给其设定如上代码的几个参数:
1)注册中心地址,(其实可以没有,有一种场景就是服务消费者知道服务提供的地址,直接调用,不需要去注册中心上订阅,不过这也是在特殊的情况下使用)
2)监控中心的地址(也是非必要的,不过最好设置上)
3)服务暴露的端口号,这个是必要的,否则消费者如何去与你建立连接进行服务消费活动呢
4)要提供的服务,一切都准备好了,全部是这个服务准备的,如果你不设置你要提供的服务,不是逗别人玩吗
5)start方法启动整个Provider的实例
好了,我们整理一下整个流程,先讲上述5点钟必要的三点,暴露端口,提供服务,和start的方法,我们先看Provider.java这个最底层的接口类,我们上面讲过,服务提供者发布要提供的服务,需要调用Provider的publishService的方法
我们看着它的实现类,在DefaultProvider中,方法也很简单,将其用户传递的参数赋值给了全局变量obj
相同的,暴露的端口号也是赋值给DefaultProvider的全局变量exposePort的,所以真正去编制服务应该是在start方法中进行的
我们先关注编制的方法:
this.publishRemotingTransporters = providerController.getLocalServerWrapperManager().wrapperRegisterInfo(this.getExposePort(), this.obj);
我们看具体的实现类:
/** * * @param listeningAddress 该服务暴露的网络地址 例如172.30.53.58::8989 * @param controller 全局限流工具 * @param obj 暴露的方法的实例 * @return */ public List<RemotingTransporter> wrapperRegisterInfo(int port, Object... obj) { List<RemotingTransporter> remotingTransporters = new ArrayList<RemotingTransporter>(); //基本判断,如果暴露的方法是null或者是0,则说明无需编织服务 if (null != obj && obj.length > 0) { for (Object o : obj) { //默认的编织对象 DefaultServiceWrapper defaultServiceWrapper = new DefaultServiceWrapper(); List<ServiceWrapper> serviceWrappers = defaultServiceWrapper.provider(o).create(); if(null != serviceWrappers && !serviceWrappers.isEmpty()){ for(ServiceWrapper serviceWrapper : serviceWrappers){ PublishServiceCustomBody commonCustomHeader = new PublishServiceCustomBody(); commonCustomHeader.setConnCount(serviceWrapper.getConnCount()); commonCustomHeader.setDegradeServiceDesc(serviceWrapper.getDegradeServiceDesc()); commonCustomHeader.setDegradeServicePath(serviceWrapper.getDegradeServicePath()); commonCustomHeader.setPort(port); commonCustomHeader.setServiceProviderName(serviceWrapper.getServiceName()); commonCustomHeader.setVIPService(serviceWrapper.isVIPService()); commonCustomHeader.setWeight(serviceWrapper.getWeight()); commonCustomHeader.setSupportDegradeService(serviceWrapper.isSupportDegradeService()); commonCustomHeader.setMaxCallCountInMinute(serviceWrapper.getMaxCallCountInMinute()); RemotingTransporter remotingTransporter = RemotingTransporter.createRequestTransporter(LaopopoProtocol.PUBLISH_SERVICE, commonCustomHeader); remotingTransporters.add(remotingTransporter); } } } } return remotingTransporters; }
可以很清晰的看到这个方法最最核心的方法就是编制了一个List<RemotingTransporter>这个对象,这个对象就是要发给注册中心的服务订阅类,也就是上面我们说的那9个属性,具体这个9个属性的编织是如下这个方法完成的
List<ServiceWrapper> serviceWrappers = defaultServiceWrapper.provider(o).create();
当我们编织List<RemotingTransporter>的时候,我们迭代了List<ServiceWrapper>这个对象而完成的,ServiceWrapper的实现也很简单:
我们接着关注defaultServiceWrapper.provider(o).create()这个方法,provider方法:
@Override public ServiceWrapperWorker provider(Object serviceProvider) { //如果proxy的对象是null,实例对象无需编织,直接返回 if(null == globalProviderProxyHandler){ this.serviceProvider = serviceProvider; }else{ Class<?> globalProxyCls = generateProviderProxyClass(globalProviderProxyHandler, serviceProvider.getClass()); this.serviceProvider = copyProviderProperties(serviceProvider, newInstance(globalProxyCls)); } return this; }
其实这个方法是把你要提供的服务做一个定制化的编织,跟Spring的AOP很类似,对类的功能进行了增强,可以完成更多更加强大的功能,加入我们不需要对其进行编织,就原样返回就可以了,详细的代码可以查看源码,这边不是关键
我们接着看create的方法
@Override public List<ServiceWrapper> create() { List<ServiceWrapper> serviceWrappers = new ArrayList<ServiceWrapper>(); //读取对象的方法注解 RPCService rpcService = null; for (Class<?> cls = serviceProvider.getClass(); cls != Object.class; cls = cls.getSuperclass()) { Method[] methods = cls.getMethods(); if(null != methods && methods.length > 0){ for(Method method :methods){ rpcService = method.getAnnotation(RPCService.class); if(null != rpcService){ //服务名 String serviceName = StringUtil.isNullOrEmpty(rpcService.serviceName())?method.getName():rpcService.serviceName(); //负责人 String responsiblityName = rpcService.responsibilityName(); //方法weight Integer weight = rpcService.weight(); //连接数 默认是1 一个实例一个1链接其实是够用的 Integer connCount = rpcService.connCount(); //是否支持服务降级 boolean isSupportDegradeService = rpcService.isSupportDegradeService(); //是否是VIP服务,如果是VIP服务,则默认是在port-2的端口暴露方法,与其他的方法使用不同的 boolean isVIPService = rpcService.isVIPService(); //暴露的降级方法的路径 String degradeServicePath = rpcService.degradeServicePath(); //降级方法的描述 String degradeServiceDesc = rpcService.degradeServiceDesc(); //每分钟调用的最大调用次数 Long maxCallCount = rpcService.maxCallCountInMinute(); if(maxCallCount <= 0){ throw new RpcWrapperException("max call count must over zero at unit time"); } ServiceFlowControllerManager serviceFlowControllerManager = providerController.getServiceFlowControllerManager(); serviceFlowControllerManager.setServiceLimitVal(serviceName, maxCallCount); //如果是支持服务降级服务,则需要根据降级方法的路径去创建这个实例,并编制proxy if(isSupportDegradeService){ Class<?> degradeClass = null; try { degradeClass = Class.forName(degradeServicePath); Object nativeObj = degradeClass.newInstance(); if(null == globalProviderProxyHandler){ this.mockDegradeServiceProvider = nativeObj; }else{ Class<?> globalProxyCls = generateProviderProxyClass(globalProviderProxyHandler, nativeObj.getClass()); this.mockDegradeServiceProvider = copyProviderProperties(nativeObj, newInstance(globalProxyCls)); } } catch (Exception e) { logger.error("[{}] class can not create by reflect [{}]",degradeServicePath,e.getMessage()); throw new RpcWrapperException("degradeService path " + degradeServicePath +"create failed" ); } } String methodName = method.getName(); Class<?>[] classes = method.getParameterTypes(); List<Class<?>[]> paramters = new ArrayList<Class<?>[]>(); paramters.add(classes); ServiceWrapper serviceWrapper = new ServiceWrapper(serviceProvider, mockDegradeServiceProvider, serviceName, responsiblityName, methodName, paramters, isSupportDegradeService, degradeServicePath, degradeServiceDesc, weight, connCount, isVIPService, maxCallCount); //放入到一个缓存中,方便以后consumer来调取服务的时候,该来获取对应真正的编织类 providerController.getProviderContainer().registerService(serviceName, serviceWrapper); serviceWrappers.add(serviceWrapper); } } } } return serviceWrappers; }
这个就是如何编织服务的代码了,说来也是简单,因为已经获取到对象实例了,所以只需要使用简单的反射,获取到它方法级的annotation就可以很easy的获取到那9个属性了,当然当中还有一些其他的操作:
1)如果该方法可以降级,则根据mock的路径把降级的类初始化好
2)通过反射获取到方法名,因为很简单,不管你的服务名昵称serviceName多么的炫酷,追根究底消费者端仍旧需要method的名字然后反射去调用
3)为单位时间内限流做了初始化操作(限流下几个小节一起分析吧~)
好了,总之,服务就是在create方法中编织好的
发送编织好的服务就变得相对很简单了,在start方法中,有一个
try { // 发布任务 this.publishedAndStartProvider(); logger.info("provider start successfully"); } catch (Exception e) { logger.error("publish service to registry failed [{}]",e.getMessage()); }
这个方法就是发布服务到注册中心的方法,看看具体的实现,也很简单
public void publishedAndStartProvider() throws InterruptedException, RemotingException { // stack copy List<RemotingTransporter> transporters = defaultProvider.getPublishRemotingTransporters(); if(null == transporters || transporters.isEmpty()){ logger.warn("service is empty please call DefaultProvider #publishService method"); return; } String address = defaultProvider.getRegistryAddress(); if (address == null) { logger.warn("registry center address is empty please check your address"); return; } String[] addresses = address.split(","); if (null != addresses && addresses.length > 0 ) { for (String eachAddress : addresses) { for (RemotingTransporter request : transporters) { pushPublishServiceToRegistry(request,eachAddress); } } } }
获取到注册中心的地址,使用我们网络篇的一些API,就可以很方便地将其发送出去了,当然因为可能注册中心是多个,所以每一个provider实例需要将服务信息发送给每一个注册中心,每个注册中心收到服务消息的时候,都会反馈一个ACK,告诉provider端,服务已经注册成功,如果没有成功,则provider重新发送一次
private void pushPublishServiceToRegistry(RemotingTransporter request, String eachAddress) throws InterruptedException, RemotingException { logger.info("[{}] transporters matched", request); messagesNonAcks.put(request.getOpaque(), new MessageNonAck(request, eachAddress)); RemotingTransporter remotingTransporter = defaultProvider.getNettyRemotingClient().invokeSync(eachAddress, request, 3000); if(null != remotingTransporter){ AckCustomBody ackCustomBody = serializerImpl().readObject(remotingTransporter.bytes(), AckCustomBody.class); logger.info("received ack info [{}]", ackCustomBody); if(ackCustomBody.isSuccess()){ messagesNonAcks.remove(ackCustomBody.getRequestId()); } }else{ logger.warn("registry center handler timeout"); } }
相关详细的代码可以查看源码:
本节END~本小节简单的分析了Provider服务注册的过程~希望大家一起看看代码,发现错误的地方,我们一起纠正~
下一个小节,分析provider如何处理Consumer发送过来的远程调用的请求的~