上个小节简单的描述了一下注册中心如何处理与服务提供者的交互问题,也列举了几个简单的代码片段,中间有几个场景是需要通知服务消费者的几个业务逻辑的
1)当服务消费者服务发布成功且审核通过的情况下,需要通知服务订阅者
2)当某个服务下线的时候,也需要通知服务订阅者某个服务下线
除了以上2个被动原因触发的原因还有一个就是订阅的功能,注册中心需要处理服务消费者发送过来的订阅请求,订阅请求包含的内容很简单,就是它需要订阅的服务,注册中心就会把提供该服务并且审核通过的所有实例服务器信息发送给服务消费者
以上就是服务消费者需要完成的所有任务了
好了,知道了要干什么了,实现起来也是相对比较容易的,我们倒着实现,我们先实现订阅的功能
case SUBSCRIBE_SERVICE: // 处理服务消费者consumer订阅服务的请求 return this.defaultRegistryServer.getProviderManager().handleSubscribe(request, ctx.channel());
SUBSCRIBE_SERVICE信号发送到注册中心的时候,走handlerSubsribe代码分支
/** * 处理consumer的消息订阅,并返回结果 * * @param request * @param channel * @return */ public RemotingTransporter handleSubscribe(RemotingTransporter request, Channel channel) { SubcribeResultCustomBody subcribeResultCustomBody = new SubcribeResultCustomBody(); RemotingTransporter responseTransporter = RemotingTransporter.createResponseTransporter(LaopopoProtocol.SUBCRIBE_RESULT, subcribeResultCustomBody, request.getOpaque()); // 接收到主体信息 SubscribeRequestCustomBody requestCustomBody = serializerImpl().readObject(request.bytes(), SubscribeRequestCustomBody.class); String serviceName = requestCustomBody.getServiceName(); // 将其降入到channel的group中去 this.defaultRegistryServer.getConsumerManager().getSubscriberChannels().add(channel); // 存储消费者信息 ConcurrentSet<Channel> channels = globalConsumerMetaMap.get(serviceName); if (null == channels) { channels = new ConcurrentSet<Channel>(); } channels.add(channel); globalConsumerMetaMap.put(serviceName, channels); //将订阅的channel上打上tag标记,表示该channel订阅的服务 attachSubscribeEventOnChannel(serviceName, channel); ConcurrentMap<Address, RegisterMeta> maps = this.getRegisterMeta(serviceName); // 如果订阅的暂时还没有服务提供者,则返回空列表给订阅者 if (maps.isEmpty()) { return responseTransporter; } //构建返回的订阅信息的对象 buildSubcribeResultCustomBody(maps, subcribeResultCustomBody); return responseTransporter; }
private void buildSubcribeResultCustomBody(ConcurrentMap<Address, RegisterMeta> maps, SubcribeResultCustomBody subcribeResultCustomBody) { Collection<RegisterMeta> values = maps.values(); if (values != null && values.size() > 0) { List<RegisterMeta> registerMetas = new ArrayList<RegisterMeta>(); for (RegisterMeta meta : values) { // 判断是否人工审核过,审核过的情况下,组装给consumer的响应主体,返回个consumer if (meta.getIsReviewed() == ServiceReviewState.PASS_REVIEW) { registerMetas.add(meta); } } subcribeResultCustomBody.setRegisterMeta(registerMetas); } }
订阅的服务其实就是把内存记录的数据反馈给服务消费者,整体的思路就是这样
接下来看看当服务上线或者下线的时候,需要推送给服务消费者的代码实现:
1) 服务上线成功的时候,并且审核通过的时候通知服务消费者:
/** * 通知相关的订阅者服务的信息 * * @param meta * @param loadBalanceStrategy * @throws InterruptedException * @throws RemotingTimeoutException * @throws RemotingSendRequestException */ public void notifyMacthedSubscriber(final RegisterMeta meta, LoadBalanceStrategy loadBalanceStrategy) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { // 构建订阅通知的主体传输对象 SubcribeResultCustomBody subcribeResultCustomBody = new SubcribeResultCustomBody(); buildSubcribeResultCustomBody(meta, subcribeResultCustomBody,loadBalanceStrategy); // 传送给consumer对象的RemotingTransporter RemotingTransporter sendConsumerRemotingTrasnporter = RemotingTransporter.createRequestTransporter(LaopopoProtocol.SUBCRIBE_RESULT, subcribeResultCustomBody); pushMessageToConsumer(sendConsumerRemotingTrasnporter, meta.getServiceName()); }
2) 服务下线的时候通知:
/** * 通知订阅者某个服务取消 * @param meta * @throws RemotingSendRequestException * @throws RemotingTimeoutException * @throws InterruptedException */ public void notifyMacthedSubscriberCancel(final RegisterMeta meta) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { // 构建订阅通知的主体传输对象 SubcribeResultCustomBody subcribeResultCustomBody = new SubcribeResultCustomBody(); buildSubcribeResultCustomBody(meta, subcribeResultCustomBody,null); RemotingTransporter sendConsumerRemotingTrasnporter = RemotingTransporter.createRequestTransporter(LaopopoProtocol.SUBCRIBE_SERVICE_CANCEL, subcribeResultCustomBody); pushMessageToConsumer(sendConsumerRemotingTrasnporter, meta.getServiceName()); }
private void pushMessageToConsumer(RemotingTransporter sendConsumerRemotingTrasnporter, String serviceName) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { // 所有的订阅者的channel集合 if (!subscriberChannels.isEmpty()) { for (Channel channel : subscriberChannels) { if (isChannelSubscribeOnServiceMeta(serviceName, channel)) { RemotingTransporter remotingTransporter = this.defaultRegistryServer.getRemotingServer().invokeSync(channel, sendConsumerRemotingTrasnporter, 3000l); // 如果是ack返回是null说明是超时了,需要重新发送 if (remotingTransporter == null) { logger.warn("push consumer message time out,need send again"); MessageNonAck msgNonAck = new MessageNonAck(remotingTransporter, channel,serviceName); messagesNonAcks.add(msgNonAck); } // 如果消费者端消费者消费失败 AckCustomBody ackCustomBody = (AckCustomBody) remotingTransporter.getCustomHeader(); if (!ackCustomBody.isSuccess()) { logger.warn("consumer fail handler this message"); MessageNonAck msgNonAck = new MessageNonAck(remotingTransporter, channel,serviceName); messagesNonAcks.add(msgNonAck); } } } } }
注册中心对于服务消费者的处理大体上就是这样了,详细代码可以查看github,有错误的地方可以告诉我一下,仅作抛砖引玉~