上两节我们简单地说明了注册中心的功能和基本结构,本节重点讲述注册中心如何处理服务提供者发送过来的信息
因为我们之前讲过Netty网络端的定义说明的,也讲了服务提供者发布服务的知识了,我们看看默认的注册中心的处理器是如何处理发布的信息的
private void registerProcessor() { this.remotingServer.registerDefaultProcessor(new DefaultRegistryProcessor(this), this.remotingExecutor); this.remotingServer.registerChannelInactiveProcessor(new DefaultRegistryChannelInactiveProcessor(this), remotingChannelInactiveExecutor); }
我们看看注册中心处理器DefaultRegistryProcessor.java需要处理哪些信息:
package org.laopopo.base.registry; import static org.laopopo.common.protocal.LaopopoProtocol.MANAGER_SERVICE; import static org.laopopo.common.protocal.LaopopoProtocol.PUBLISH_CANCEL_SERVICE; import static org.laopopo.common.protocal.LaopopoProtocol.PUBLISH_SERVICE; import static org.laopopo.common.protocal.LaopopoProtocol.SUBSCRIBE_SERVICE; import io.netty.channel.ChannelHandlerContext; import org.laopopo.remoting.ConnectionUtils; import org.laopopo.remoting.model.NettyRequestProcessor; import org.laopopo.remoting.model.RemotingTransporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author BazingaLyn * @description 注册中心的处理转换器 * @time 2016年8月13日 * @modifytime 2016年8月31日 */ public class DefaultRegistryProcessor implements NettyRequestProcessor { private static final Logger logger = LoggerFactory.getLogger(DefaultRegistryProcessor.class); private DefaultRegistryServer defaultRegistryServer; public DefaultRegistryProcessor(DefaultRegistryServer defaultRegistryServer) { this.defaultRegistryServer = defaultRegistryServer; } @Override public RemotingTransporter processRequest(ChannelHandlerContext ctx, RemotingTransporter request) throws Exception { if (logger.isDebugEnabled()) { logger.debug("receive request, {} {} {}",// request.getCode(), // ConnectionUtils.parseChannelRemoteAddr(ctx.channel()), // request); } switch (request.getCode()) { case PUBLISH_SERVICE: // 处理服务提供者provider推送的服务信息 return this.defaultRegistryServer.getProviderManager().handlerRegister(request, ctx.channel()); // 要保持幂等性,同一个实例重复发布同一个服务的时候对于注册中心来说是无影响的 case PUBLISH_CANCEL_SERVICE: // 处理服务提供者provider推送的服务取消的信息 return this.defaultRegistryServer.getProviderManager().handlerRegisterCancel(request, ctx.channel()); case SUBSCRIBE_SERVICE: // 处理服务消费者consumer订阅服务的请求 return this.defaultRegistryServer.getProviderManager().handleSubscribe(request, ctx.channel()); case MANAGER_SERVICE: // 处理管理者发送过来的服务管理服务 return this.defaultRegistryServer.getProviderManager().handleManager(request, ctx.channel()); } return null; } }
服务提供者发送过来的信息标识是PUBLISH_SERVICE和PUBLISH_CANCEL_SERVICE这两种标识
首先我们先看PUBLISH_SERVICE这个逻辑的处理,代码注释中写的很清楚,要保持幂等性,否则会造成很多莫名其妙的问题,这个当然是用ProviderManager,各司其职嘛,我们看看handlerRegister的处理,看代码之前我们先明确一下这段代码要做的事情:
1)将服务提供者发布的信息记录到注册中心的本地内存中去,相当于把发布的信息记录下来,否则服务消费者来订阅服务的时候,去哪里查找这些记录
2)从历史记录中找到其历史的审核记录,如果历史记录上,也就是说硬盘持久化中记录到的审核记录,如果以前显示是审核通过的情况下,就直接设置该服务审核通过,无需再审核,如果在历史记录中没有找到该服务的信息,说明该服务是第一次注册发布,我们需要给出默认的服务持久化信息
3)如果该服务已经审核通过,我们需要通知通知订阅该服务的消费者新增了一个服务提供者
4)当上述三件事情都搞定了,我们需要发送一个ACK信息给服务提供者,告之它注册中心已经成功接收到它发布的信息了,叫他安心~
因为注册中心的所有信息都是基于java内存实现的,所以我们使用全局变量来保存这些信息,某些信息,比如审核或者权重的信息需要持久化到硬盘的,我们做定时任务处理,定时刷盘,全局变量如下
// 某个服务 private final ConcurrentMap<String, ConcurrentMap<Address, RegisterMeta>> globalRegisterInfoMap = new ConcurrentHashMap<String, ConcurrentMap<Address, RegisterMeta>>(); // 指定节点都注册了哪些服务 private final ConcurrentMap<Address, ConcurrentSet<String>> globalServiceMetaMap = new ConcurrentHashMap<RegisterMeta.Address, ConcurrentSet<String>>(); // 某个服务 订阅它的消费者的channel集合 private final ConcurrentMap<String, ConcurrentSet<Channel>> globalConsumerMetaMap = new ConcurrentHashMap<String, ConcurrentSet<Channel>>(); // 提供者某个地址对应的channel private final ConcurrentMap<Address, Channel> globalProviderChannelMetaMap = new ConcurrentHashMap<RegisterMeta.Address, Channel>(); //每个服务的历史记录 private final ConcurrentMap<String, RegistryPersistRecord> historyRecords = new ConcurrentHashMap<String, RegistryPersistRecord>(); //每个服务对应的负载策略 private final ConcurrentMap<String, LoadBalanceStrategy> globalServiceLoadBalance = new ConcurrentHashMap<String, LoadBalanceStrategy>();
handlerRegister的具体逻辑:
@Override public RemotingTransporter handlerRegister(RemotingTransporter remotingTransporter, Channel channel) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { // 准备好ack信息返回个provider,悲观主义,默认返回失败ack,要求provider重新发送请求 AckCustomBody ackCustomBody = new AckCustomBody(remotingTransporter.getOpaque(), false); RemotingTransporter responseTransporter = RemotingTransporter.createResponseTransporter(LaopopoProtocol.ACK, ackCustomBody, remotingTransporter.getOpaque()); // 接收到主体信息 PublishServiceCustomBody publishServiceCustomBody = serializerImpl().readObject(remotingTransporter.bytes(), PublishServiceCustomBody.class); RegisterMeta meta = RegisterMeta.createRegiserMeta(publishServiceCustomBody,channel); if (logger.isDebugEnabled()) { logger.info("Publish [{}] on channel[{}].", meta, channel); } // channel上打上该服务的标记 方便当channel inactive的时候,直接从channel上拿到标记的属性,通知 attachPublishEventOnChannel(meta, channel); // 一个服务的最小单元,也是确定一个服务的最小单位 final String serviceName = meta.getServiceName(); // 找出提供此服务的全部地址和该服务在该地址下的审核情况 ConcurrentMap<Address, RegisterMeta> maps = this.getRegisterMeta(serviceName); synchronized (globalRegisterInfoMap) { //历史记录中的所有服务的持久化的信息记录 ConcurrentMap<String, RegistryPersistRecord> concurrentMap = historyRecords; // 获取到这个地址可能以前注册过的注册信息 RegisterMeta existRegiserMeta = maps.get(meta.getAddress()); // 如果等于空,则说明以前没有注册过 这就需要从历史记录中将某些服务以前注册审核的信息恢复一下记录 if (null == existRegiserMeta) { RegistryPersistRecord persistRecord = concurrentMap.get(serviceName); //如果历史记录中没有记录该信息,也就说持久化中没有记录到该信息的时候,就需要构造默认的持久化信息 if(null == persistRecord){ persistRecord = new RegistryPersistRecord(); persistRecord.setServiceName(serviceName); //持久化的服务名 persistRecord.setBalanceStrategy(LoadBalanceStrategy.WEIGHTINGRANDOM); //默认的负载均衡的策略 PersistProviderInfo providerInfo = new PersistProviderInfo(); providerInfo.setAddress(meta.getAddress()); //服务提供者的地址 providerInfo.setIsReviewed(ServiceReviewState.HAS_NOT_REVIEWED); //服务默认是未审核 persistRecord.getProviderInfos().add(providerInfo); concurrentMap.put(serviceName, persistRecord); } //循环该服务的所有服务提供者实例的信息,获取到当前实例的审核状态,设置好meta的审核信息 for(PersistProviderInfo providerInfo:persistRecord.getProviderInfos()){ if(providerInfo.getAddress().equals(meta.getAddress())){ meta.setIsReviewed(providerInfo.getIsReviewed()); } } existRegiserMeta = meta; maps.put(meta.getAddress(), existRegiserMeta); } this.getServiceMeta(meta.getAddress()).add(serviceName); //默认的负载均衡的策略 LoadBalanceStrategy defaultBalanceStrategy = LoadBalanceStrategy.WEIGHTINGRANDOM; if(null != concurrentMap.get(serviceName)){ RegistryPersistRecord persistRecord = concurrentMap.get(serviceName); if(null != persistRecord.getBalanceStrategy()){ defaultBalanceStrategy = persistRecord.getBalanceStrategy(); } } // 设置该服务默认的负载均衡的策略 globalServiceLoadBalance.put(serviceName, defaultBalanceStrategy); // 判断provider发送的信息已经被成功的存储的情况下,则告之服务注册成功 ackCustomBody.setSuccess(true); // 如果审核通过,则通知相关服务的订阅者 if (meta.getIsReviewed() == ServiceReviewState.PASS_REVIEW) { this.defaultRegistryServer.getConsumerManager().notifyMacthedSubscriber(meta, globalServiceLoadBalance.get(serviceName)); } } //将地址与该channel绑定好,方便其他地方使用 globalProviderChannelMetaMap.put(meta.getAddress(), channel); return responseTransporter; }
这段冗长的代码就是完成的四点需求了,收到服务提供者发布的信息之后就是接受到该信息,从历史记录中恢复该信息的审核记录,负载均衡的策略,然后如果此时这个服务的审核状态是通过的情况下,就请兄弟类ConsumerManager把该服务的信息推送给对应的订阅者,最后发送ack信息给服务提供者,告之它接收信息成功,这段代码整体的流程就是如此
另一种场景就是服务提供者通知发布服务取消的信号的时候,也就是PUBLISH_CANCEL_SERVICE这个信号的时候,要做的事情也是很简单:
1)将记录在java内存中的信息移除掉,其实也就是将全局变量的信息移除掉
2)如果该服务已经是审核通过的,则需要再发送信息给订阅该服务的服务消费者,该服务下线,使其不再调用该服务了
/*** * 服务下线的接口 * * @param meta * @param channel * @throws InterruptedException * @throws RemotingTimeoutException * @throws RemotingSendRequestException */ public void handlePublishCancel(RegisterMeta meta, Channel channel) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { if (logger.isDebugEnabled()) { logger.info("Cancel publish {} on channel{}.", meta, channel); } //将其channel上打上的标记移除掉 attachPublishCancelEventOnChannel(meta, channel); final String serviceMeta = meta.getServiceName(); ConcurrentMap<Address, RegisterMeta> maps = this.getRegisterMeta(serviceMeta); if (maps.isEmpty()) { return; } synchronized (globalRegisterInfoMap) { Address address = meta.getAddress(); RegisterMeta data = maps.remove(address); if (data != null) { this.getServiceMeta(address).remove(serviceMeta); if (data.getIsReviewed() == ServiceReviewState.PASS_REVIEW ) this.defaultRegistryServer.getConsumerManager().notifyMacthedSubscriberCancel(meta); } } }
上述代码就是简单的实现了上述的功能
当然还有一种场景就是当服务提供者的实例直接关闭的时候,与注册中心之间保持的netty长连接也会断掉,实例关闭自然就是服务下线,所以我们也要对其作出处理
因为某个服务提供者实例提供的远程服务可能不止一个,所以我们要做的就是将这些服务全部做下线处理,因为我们之前已经将建立的channel上打上了tag,所以我们很容易知道某个服务实例上到底提供了多少个远程服务
@Override public void processChannelInactive(ChannelHandlerContext ctx) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { //获取到当前的channel,此时的channel应该是打过记号的 Channel channel = ctx.channel(); // 取消之前发布的所有服务 ConcurrentSet<RegisterMeta> registerMetaSet = channel.attr(S_PUBLISH_KEY).get(); //如果该channel打过的记号是空,或者是空集合的话,直接返回 if (registerMetaSet == null || registerMetaSet.isEmpty()) { logger.debug("registerMetaSet is empty"); return; } //接下来需要做两件事情 //1 修改当前注册中心该channel所提供的所有服务取消 //2 发送请求告之consumer该地址对应的所有服务下线 Address address = null; for (RegisterMeta meta : registerMetaSet) { if (address == null) { address = meta.getAddress(); } this.defaultRegistryServer.getProviderManager().handlePublishCancel(meta, channel); } }
总而言之,注册中心处理服务提供者的之间的业务逻辑还是相对比较简单的,总结而言就是记录,通知服务消费者,反之就是消除记录,通知服务消费者服务下线
详细代码查看