大家知道网络通信都是异步的,当你使用网络去发送一个请求的之后,就会去等待这个请求对应的响应体,可是你却不知道这个响应到底何时到达。
这就是异步的好处也是异步的坏处,好处就是在请求发送出去之后,你就可以去做其他的事,就比如你跟你的女神用微信表白了,表白之后,你不需要眼睛直勾勾的盯着手机微信,可以去干一些其他的事情,坏处就是你不知道响应到底什么到来,所以即使你去做其他的事情也会不舒心,你会时不时的去看手机的,这就是异步"表白"的优点和缺点了吧
解决方案,java的异步设计模式中有一个叫做Promise的设计模式,类图是:
看这种类图很枯燥,还是举例说明吧,你表白完之后,跟你女神的闺蜜说:我跟你的闺蜜告白了,正在等待结果
女神的闺蜜说:我知道的,她刚才告诉我了,她说第一时间会先告诉我,说要考虑1天,她告诉我之后,我会第一时间通知你
上述的两段文件我觉得我已经把promise的设计模式说清楚了,这个闺蜜就是图中的Promise,电话就是那个通知事件,有了这个话之后,你就不需要再直勾勾的盯着手机去看微信了,也不需要心不在焉地去做任何事情了,因为你知道一天之内,女神闺蜜会给你电话了~
好了,我们看看如何实现这个设计模式了,在Netty中,我们一般是使用channel.writeAndFlush方法去发送请求的
我们就从这块入手吧~
我们先定一个请求的入口:
然后我们去实现这个接口,我们定一个类NettyRemotingBase.java,要知道不管是C端还是S端都有invokeSync的权利,所以我们NettyRemotingBase的类应该是Netty C/S 两端代码的父类:
好了,我们接着看,当远程端(假设是女神端)发送了Yes ,I do的信息给她闺蜜的时候,这个闺蜜也就成了响应,我们先定义一下这个闺蜜的java实现,我们先明确一下这个闺蜜的功能,核心功能就是当她收到女神的短信之后,里面通知你,好了,我们定义闺蜜的名称是RemotingResponse
package org.laopopo.remoting.model; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.laopopo.remoting.InvokeCallback; /** * * @author BazingaLyn * @description 请求返回的对象包装类 * @time 2016年8月10日11:08:43 * @modifytime */ public class RemotingResponse { // 远程端返回的结果集 private volatile RemotingTransporter remotingTransporter; // 该请求抛出的异常,如果存在的话 private volatile Throwable cause; // 发送端是否发送成功 private volatile boolean sendRequestOK = true; // 请求的opaque private final long opaque; // 默认的回调函数 private final InvokeCallback invokeCallback; // 请求的默认超时时间 private final long timeoutMillis; private final long beginTimestamp = System.currentTimeMillis(); private final CountDownLatch countDownLatch = new CountDownLatch(1); public RemotingResponse(long opaque, long timeoutMillis, InvokeCallback invokeCallback) { this.invokeCallback = invokeCallback; this.opaque = opaque; this.timeoutMillis = timeoutMillis; } public void executeInvokeCallback() { if (invokeCallback != null) { invokeCallback.operationComplete(this); } } public boolean isSendRequestOK() { return sendRequestOK; } public void setSendRequestOK(boolean sendRequestOK) { this.sendRequestOK = sendRequestOK; } public long getOpaque() { return opaque; } public RemotingTransporter getRemotingTransporter() { return remotingTransporter; } public void setRemotingTransporter(RemotingTransporter remotingTransporter) { this.remotingTransporter = remotingTransporter; } public Throwable getCause() { return cause; } public void setCause(Throwable cause) { this.cause = cause; } public long getTimeoutMillis() { return timeoutMillis; } public long getBeginTimestamp() { return beginTimestamp; } public RemotingTransporter waitResponse() throws InterruptedException{ this.countDownLatch.await(this.timeoutMillis, TimeUnit.MILLISECONDS); return this.remotingTransporter; } /** * 当远程端返回结果的时候,TCP的长连接的上层载体channel 的handler会将其放入与requestId * 对应的Response中去 * @param remotingTransporter */ public void putResponse(final RemotingTransporter remotingTransporter){ this.remotingTransporter = remotingTransporter; //接收到对应的消息之后需要countDown this.countDownLatch.countDown(); } }
这个方法,我们需要关注的是CountDownLatch,这是这个类的灵魂,我们要关注的是这个类的waitResponse和putResponse方法,当请求发送方发送请求之后,它就会调用
waitResponse这个方法,等待响应结果,同样,女神闺蜜收到结果之后,不管好坏,她都会调用putResponse
1)当女神闺蜜在一天之内调用putResponse这个方法,则countDownLatch.countDown(),导致的效果就是waitResponse立即返回结果,达到了通知的效果
2)如果女神闺蜜在一天之内没有调用putResponse方法,则视为超时,this.countDownLatch.await(this.timeoutMillis, TimeUnit.MILLISECONDS)这个方法也会在deadline之后返回
好了,我们接着看具体的实现,当请求调用者执行invokeSync的时候,我们看看具体的实现吧
上图中,标红的68行代码相当于就是先构建一个RemotingResponse,这个与request相对应,所以需要将requestId和超时时间传入,超时时间是用于设置countdownlatch的最长等待时间,然后将其放入到responseTable中去,responseTable实现也很简单
/******key为请求的opaque value是远程返回的结果封装类******/ protected final ConcurrentHashMap<Long, RemotingResponse> responseTable = new ConcurrentHashMap<Long, RemotingResponse>(256);
好了,到此为此,相当于你发送了请求给女神,也打电话告诉了女神闺蜜了,现在你要做的事情就是等待了,但是还是有个小细节的
这个细节就是上图中76行代码到81行代码,你发送了微信请求了,但不代表女神成功的接收了告白了,所以站在Netty的角度上来说,你需要加一个ChannelFutureListener,请求是异步的,所以我们要确保告白成功发送到女神手中了,设置remotingResponse的状态是OK,
假如发送失败,女神的手机坏了,微信号换了,你就不要傻傻的等待告白结果了,我们还是乖乖的做相对应的处理吧~
我们接着看invokeSyncImpl的实现类:
当你发送成功之后,你就需要调用waitResponse了,如上述代码讲述的那样
最后一个问题,是在那个地方调用了putResponse方法了啊,其实想想也简单,肯定是在Netty的Handler的某个read方法中了,在Netty的Client和Server的handler中实现该方法:
Client端部分:
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingTransporter> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingTransporter msg) throws Exception { processMessageReceived(ctx, msg); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { processChannelInactive(ctx); } }
Server端部分:
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingTransporter> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingTransporter msg) throws Exception { processMessageReceived(ctx, msg); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { processChannelInactive(ctx); } }
我们看看processMessageReceived这个方法
//ChannelRead0方法对应的具体实现 protected void processMessageReceived(ChannelHandlerContext ctx, RemotingTransporter msg) { if(logger.isDebugEnabled()){ logger.debug("channel [] received RemotingTransporter is [{}]",ctx.channel(),msg); } final RemotingTransporter remotingTransporter = msg; if (remotingTransporter != null) { switch (remotingTransporter.getTransporterType()) { //作为server端 client端的请求的对应的处理 case REQUEST_REMOTING: processRemotingRequest(ctx, remotingTransporter); break; //作为客户端,来自server端的响应的处理 case RESPONSE_REMOTING: processRemotingResponse(ctx, remotingTransporter); break; default: break; } } }
我们知道RemotingTransporter中有个变量代码这是请求还是响应了,这里我们知道RemotingTransport这个请求响应标识肯定是响应,所以我们看processRemotingResponse这个方法
protected void processRemotingResponse(ChannelHandlerContext ctx, RemotingTransporter remotingTransporter) { //从缓存篮子里拿出对应请求的对应响应的载体RemotingResponse final RemotingResponse remotingResponse = responseTable.get(remotingTransporter.getOpaque()); //不超时的情况下 if(null != remotingResponse){ //首先先设值,这样会在countdownlatch wait之前把值赋上 remotingResponse.setRemotingTransporter(remotingTransporter); //可以直接countdown remotingResponse.putResponse(remotingTransporter); //从篮子中移除 responseTable.remove(remotingTransporter.getOpaque()); } else { logger.warn("received response but matched Id is removed from responseTable maybe timeout"); logger.warn(remotingTransporter.toString()); } }
好了,整个异步表白,同步Yes I do的整个java实现过程就是这样了,不知道大家有没有看懂~
完整代码请查看:
https://github.com/BazingaLyn/laopopo-rpc/tree/master/laopopo-remoting
如果有错误,欢迎纠正,本节END~