不灭的焱

革命尚未成功,同志仍须努力

作者:php-note.com  发布于:2018-11-04 16:17  分类:Java  编辑

大家知道网络通信都是异步的,当你使用网络去发送一个请求的之后,就会去等待这个请求对应的响应体,可是你却不知道这个响应到底何时到达。

 

这就是异步的好处也是异步的坏处,好处就是在请求发送出去之后,你就可以去做其他的事,就比如你跟你的女神用微信表白了,表白之后,你不需要眼睛直勾勾的盯着手机微信,可以去干一些其他的事情,坏处就是你不知道响应到底什么到来,所以即使你去做其他的事情也会不舒心,你会时不时的去看手机的,这就是异步"表白"的优点和缺点了吧

 

解决方案,java的异步设计模式中有一个叫做Promise的设计模式,类图是:

看这种类图很枯燥,还是举例说明吧,你表白完之后,跟你女神的闺蜜说:我跟你的闺蜜告白了,正在等待结果

 

女神的闺蜜说:我知道的,她刚才告诉我了,她说第一时间会先告诉我,说要考虑1天,她告诉我之后,我会第一时间通知你

 

上述的两段文件我觉得我已经把promise的设计模式说清楚了,这个闺蜜就是图中的Promise,电话就是那个通知事件,有了这个话之后,你就不需要再直勾勾的盯着手机去看微信了,也不需要心不在焉地去做任何事情了,因为你知道一天之内,女神闺蜜会给你电话了~

 

好了,我们看看如何实现这个设计模式了,在Netty中,我们一般是使用channel.writeAndFlush方法去发送请求的

我们就从这块入手吧~

 

我们先定一个请求的入口:

 

然后我们去实现这个接口,我们定一个类NettyRemotingBase.java,要知道不管是C端还是S端都有invokeSync的权利,所以我们NettyRemotingBase的类应该是Netty C/S 两端代码的父类:

好了,我们接着看,当远程端(假设是女神端)发送了Yes ,I do的信息给她闺蜜的时候,这个闺蜜也就成了响应,我们先定义一下这个闺蜜的java实现,我们先明确一下这个闺蜜的功能,核心功能就是当她收到女神的短信之后,里面通知你,好了,我们定义闺蜜的名称是RemotingResponse

package org.laopopo.remoting.model;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.laopopo.remoting.InvokeCallback;

/**
 * 
 * @author BazingaLyn
 * @description 请求返回的对象包装类
 * @time 2016年8月10日11:08:43
 * @modifytime
 */
public class RemotingResponse {

	// 远程端返回的结果集
	private volatile RemotingTransporter remotingTransporter;

	// 该请求抛出的异常,如果存在的话
	private volatile Throwable cause;
	// 发送端是否发送成功
	private volatile boolean sendRequestOK = true;

	// 请求的opaque
	private final long opaque;

	// 默认的回调函数
	private final InvokeCallback invokeCallback;

	// 请求的默认超时时间
	private final long timeoutMillis;

	private final long beginTimestamp = System.currentTimeMillis();
	private final CountDownLatch countDownLatch = new CountDownLatch(1);

	public RemotingResponse(long opaque, long timeoutMillis, InvokeCallback invokeCallback) {
		this.invokeCallback = invokeCallback;
		this.opaque = opaque;
		this.timeoutMillis = timeoutMillis;
	}

	public void executeInvokeCallback() {
		if (invokeCallback != null) {
			invokeCallback.operationComplete(this);
		}
	}

	public boolean isSendRequestOK() {
		return sendRequestOK;
	}

	public void setSendRequestOK(boolean sendRequestOK) {
		this.sendRequestOK = sendRequestOK;
	}

	public long getOpaque() {
		return opaque;
	}

	public RemotingTransporter getRemotingTransporter() {
		return remotingTransporter;
	}

	public void setRemotingTransporter(RemotingTransporter remotingTransporter) {
		this.remotingTransporter = remotingTransporter;
	}

	public Throwable getCause() {
		return cause;
	}

	public void setCause(Throwable cause) {
		this.cause = cause;
	}

	public long getTimeoutMillis() {
		return timeoutMillis;
	}

	public long getBeginTimestamp() {
		return beginTimestamp;
	}
	
	public RemotingTransporter waitResponse() throws InterruptedException{
		this.countDownLatch.await(this.timeoutMillis, TimeUnit.MILLISECONDS);
		return this.remotingTransporter;
	}
	
	/**
	 * 当远程端返回结果的时候,TCP的长连接的上层载体channel 的handler会将其放入与requestId
	 * 对应的Response中去
	 * @param remotingTransporter
	 */
	public void putResponse(final RemotingTransporter remotingTransporter){
		this.remotingTransporter = remotingTransporter;
		//接收到对应的消息之后需要countDown
		this.countDownLatch.countDown();
	}

}

这个方法,我们需要关注的是CountDownLatch,这是这个类的灵魂,我们要关注的是这个类的waitResponse和putResponse方法,当请求发送方发送请求之后,它就会调用
waitResponse这个方法,等待响应结果,同样,女神闺蜜收到结果之后,不管好坏,她都会调用putResponse

1)当女神闺蜜在一天之内调用putResponse这个方法,则countDownLatch.countDown(),导致的效果就是waitResponse立即返回结果,达到了通知的效果

2)如果女神闺蜜在一天之内没有调用putResponse方法,则视为超时,this.countDownLatch.await(this.timeoutMillis, TimeUnit.MILLISECONDS)这个方法也会在deadline之后返回

 

好了,我们接着看具体的实现,当请求调用者执行invokeSync的时候,我们看看具体的实现吧

上图中,标红的68行代码相当于就是先构建一个RemotingResponse,这个与request相对应,所以需要将requestId和超时时间传入,超时时间是用于设置countdownlatch的最长等待时间,然后将其放入到responseTable中去,responseTable实现也很简单

 

/******key为请求的opaque value是远程返回的结果封装类******/
	protected final ConcurrentHashMap<Long, RemotingResponse> responseTable = new ConcurrentHashMap<Long, RemotingResponse>(256);

 

好了,到此为此,相当于你发送了请求给女神,也打电话告诉了女神闺蜜了,现在你要做的事情就是等待了,但是还是有个小细节的

 

这个细节就是上图中76行代码到81行代码,你发送了微信请求了,但不代表女神成功的接收了告白了,所以站在Netty的角度上来说,你需要加一个ChannelFutureListener,请求是异步的,所以我们要确保告白成功发送到女神手中了,设置remotingResponse的状态是OK,

假如发送失败,女神的手机坏了,微信号换了,你就不要傻傻的等待告白结果了,我们还是乖乖的做相对应的处理吧~

 

我们接着看invokeSyncImpl的实现类:

当你发送成功之后,你就需要调用waitResponse了,如上述代码讲述的那样

 

最后一个问题,是在那个地方调用了putResponse方法了啊,其实想想也简单,肯定是在Netty的Handler的某个read方法中了,在Netty的Client和Server的handler中实现该方法:

 

Client端部分:

class NettyClientHandler extends SimpleChannelInboundHandler<RemotingTransporter> {

     @Override
     protected void channelRead0(ChannelHandlerContext ctx, RemotingTransporter msg) throws Exception {
          processMessageReceived(ctx, msg);
     }
     
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
          processChannelInactive(ctx);
     }
}

 

Server端部分:

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingTransporter> {

   @Override
   protected void channelRead0(ChannelHandlerContext ctx, RemotingTransporter msg) throws Exception {
       processMessageReceived(ctx, msg);
   }
   
   @Override
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
     processChannelInactive(ctx);
   }
   
     
}

 

我们看看processMessageReceived这个方法

//ChannelRead0方法对应的具体实现
protected void processMessageReceived(ChannelHandlerContext ctx, RemotingTransporter msg) {
    
    if(logger.isDebugEnabled()){
         logger.debug("channel [] received RemotingTransporter is [{}]",ctx.channel(),msg);
    }
    
    final RemotingTransporter remotingTransporter = msg;
    
    if (remotingTransporter != null) {
        switch (remotingTransporter.getTransporterType()) {
        //作为server端 client端的请求的对应的处理
        case REQUEST_REMOTING:
            processRemotingRequest(ctx, remotingTransporter);
            break;
        //作为客户端,来自server端的响应的处理
        case RESPONSE_REMOTING:
            processRemotingResponse(ctx, remotingTransporter);
            break;
        default:
            break;
        }
    }
}

 

我们知道RemotingTransporter中有个变量代码这是请求还是响应了,这里我们知道RemotingTransport这个请求响应标识肯定是响应,所以我们看processRemotingResponse这个方法

protected void processRemotingResponse(ChannelHandlerContext ctx, RemotingTransporter remotingTransporter) {
    //从缓存篮子里拿出对应请求的对应响应的载体RemotingResponse
    final RemotingResponse remotingResponse = responseTable.get(remotingTransporter.getOpaque());
    
    //不超时的情况下
    if(null != remotingResponse){
      //首先先设值,这样会在countdownlatch wait之前把值赋上
      remotingResponse.setRemotingTransporter(remotingTransporter);
      //可以直接countdown
      remotingResponse.putResponse(remotingTransporter);
      //从篮子中移除
      responseTable.remove(remotingTransporter.getOpaque());
    } else {
            logger.warn("received response but matched Id is removed from responseTable maybe timeout");
            logger.warn(remotingTransporter.toString());
        }
}

 

好了,整个异步表白,同步Yes I do的整个java实现过程就是这样了,不知道大家有没有看懂~

 

完整代码请查看:

https://github.com/BazingaLyn/laopopo-rpc/tree/master/laopopo-remoting

 

如果有错误,欢迎纠正,本节END~

 

 

摘自:https://blog.csdn.net/linuu/article/details/52516042