新的socket通道类可以运行非阻塞模式并且是可选择的。这两个性能可以激活大程序(如网络服务器和中间件组件)巨大的可伸缩性和灵活性。本节中我们会看到,再也没有为每个socket连接使用一个线程的必要了,也避免了管理大量线程所需的上下文交换总开销。借助新的NIO类,一个或几个线程就可以管理成百上千的活动socket连接了并且只有很少甚至可能没有性能损失。所有的socket通道类(DatagramChannel、SocketChannel和ServerSocketChannel)都继承了位于java.nio.channels.spi包中的AbstractSelectableChannel。这意味着我们可以用一个Selector对象来执行socket通道的就绪选择(readiness selection)。
全部socket通道类(DatagramChannel、SocketChannel和ServerSocketChannel)在被实例化时都会创建一个对等socket对象。这些是我们所熟悉的来自java.net的类(Socket、ServerSocket和DatagramSocket),它们已经被更新以识别通道。对等socket可以通过调用socket( )方法从一个通道上获取。此外,这三个java.net类现在都有getChannel( )方法。
要把一个socket通道置于非阻塞模式,我们要依靠所有socket通道类的公有超级类:SelectableChannel。就绪选择(readiness selection)是一种可以用来查询通道的机制,该查询可以判断通道是否准备好执行一个目标操作,如读或写。非阻塞I/O和可选择性是紧密相连的,那也正是管理阻塞模式的API代码要在SelectableChannel超级类中定义的原因。
设置或重新设置一个通道的阻塞模式是很简单的,只要调用configureBlocking( )方法即可,传递参数值为true则设为阻塞模式,参数值为false值设为非阻塞模式。真的,就这么简单!您可以通过调用isBlocking( )方法来判断某个socket通道当前处于哪种模式。
public final SelectableChannel configureBlocking(boolean block) throws IOException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if (blocking == block) return this; if (block && haveValidKeys()) throw new IllegalBlockingModeException(); implConfigureBlocking(block); blocking = block; } return this; }
偶尔地,我们也会需要防止socket通道的阻塞模式被更改。API中有一个blockingLock( )方法,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。只有拥有此对象的锁的线程才能更改通道的阻塞模式。
二、 ServerSocketChannel
public abstract class ServerSocketChannel extends AbstractSelectableChannel { public static ServerSocketChannel open() throws IOException; public abstract ServerSocket socket(); public abstract ServerSocket accept()throws IOException; public final int validOps(); }
同它的对等体java.net.ServerSocket一样,ServerSocketChannel也有accept( )方法。一旦您创建了一个ServerSocketChannel并用对等socket绑定了它,然后您就可以在其中一个上调用accept()。如果您选择在ServerSocket上调用accept( )方法,那么它会同任何其他的ServerSocket表现一样的行为:总是阻塞并返回一个java.net.Socket对象。如果您选择在ServerSocketChannel上调用accept()方法则会返回SocketChannel类型的对象,返回的对象能够在非阻塞模式下运行。
- ServerSocketChannel的accept()方法会返回SocketChannel类型对象,SocketChannel可以在非阻塞模式下运行。
- 其它Socket的accept()方法会阻塞返回一个Socket对象。
如果ServerSocketChannel以非阻塞模式被调用,当没有传入连接在等待时,ServerSocketChannel.accept( )会立即返回null。正是这种检查连接而不阻塞的能力实现了可伸缩性并降低了复杂性。可选择性也因此得到实现。我们可以使用一个选择器实例来注册一个ServerSocketChannel对象以实现新连接到达时自动通知的功能。以下代码演示了如何使用一个非阻塞的accept()方法:
package nio.demo2; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; public class ChannelAccept { public static final String GREETING = "Hello I must be going.\r\n"; /** * 主方法 */ public static void main(String[] args) throws Exception { int port = 1234; if (args.length > 0) { port = Integer.parseInt(args[0]); } ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes()); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); while (true) { System.out.println("Waiting for connections"); SocketChannel sc = ssc.accept(); if (sc == null) { System.out.println("null"); Thread.sleep(2000); } else { System.out.println("Incoming connection from : " + sc.socket().getRemoteSocketAddress()); buffer.rewind(); sc.write(buffer); sc.close(); } } } }
2.1、打开 ServerSocketChannel
通过调用 ServerSocketChannel.open() 方法来打开ServerSocketChannel.如:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
2.2、关闭 ServerSocketChannel
通过调用ServerSocketChannel.close() 方法来关闭ServerSocketChannel. 如:
通过 ServerSocketChannel.accept() 方法监听新进来的连接。当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。因此,accept()方法会一直阻塞到有新连接到达。
通常不会仅仅只监听一个连接,在while循环中调用 accept()方法。如下面的例子:
while (true) { SocketChannel socketChannel = serverSocketChannel.accept(); //... }
会在SocketChannel sc = ssc.accept();这里阻塞住进程。
ServerSocketChannel可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null,如:
ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); while (true) { System.out.println("Waiting for connections"); SocketChannel sc = ssc.accept(); if (sc != null) { } }
Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。可以通过以下2种方式创建SocketChannel:
- 打开一个SocketChannel并连接到互联网上的某台服务器。
- 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel。
3.1、打开 SocketChannel
SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("", 80));
3.2、关闭 SocketChannel
3.3、从 SocketChannel 读取数据
ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = socketChannel.read(buf);
首先,分配一个Buffer。从SocketChannel读取到的数据将会放到这个Buffer中。然后,调用SocketChannel.read()。该方法将数据从SocketChannel 读到Buffer中。read()方法返回的int值表示读了多少字节进Buffer里。如果返回的是-1,表示已经读到了流的末尾(连接关闭了)。
3.4、写入 SocketChannel
String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); while (buf.hasRemaining()) { channel.write(buf); }
可以设置 SocketChannel 为非阻塞模式(non-blocking mode).设置之后,就可以在异步模式下调用connect(), read() 和write()了。
socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("", 80)); while (!socketChannel.finishConnect()) { //wait, or do something else... }
4.1、打开 DatagramChannel
下面是 DatagramChannel 的打开方式:
DatagramChannel channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(9999));
这个例子打开的 DatagramChannel可以在UDP端口9999上接收数据包。
ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); channel.receive(buf);
String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); int bytesSent = channel.send(buf, new InetSocketAddress("php-note.com", 80));
这个例子发送一串字符到”jenkov.com”服务器的UDP端口80。 因为服务端并没有监控这个端口,所以什么也不会发生。也不会通知你发出的数据包是否已收到,因为UDP在数据传送方面没有任何保证。
channel.connect(new InetSocketAddress("php-note.com", 80));
int bytesRead = channel.read(buf); int bytesWritten = channel.write(but);
package com.dxz.springsession.nio.demo3; import java.nio.channels.*; import java.nio.charset.*; import java.net.*; import java.io.*; import java.util.*; import java.nio.*; public class DatagramChannelServerDemo { // UDP协议服务端 private int port = 9975; DatagramChannel channel; private Charset charset = Charset.forName("UTF-8"); private Selector selector = null; public DatagramChannelServerDemo() throws IOException { try { selector = Selector.open(); channel = DatagramChannel.open(); } catch (Exception e) { selector = null; channel = null; System.out.println("超时"); } System.out.println("服务器启动"); } /* 编码过程 */ public ByteBuffer encode(String str) { return charset.encode(str); } /* 解码过程 */ public String decode(ByteBuffer bb) { return charset.decode(bb).toString(); } /* 服务器服务方法 */ public void service() throws IOException { if (channel == null || selector == null) return; channel.configureBlocking(false); channel.socket().bind(new InetSocketAddress(port)); // channel.write(ByteBuffer.wrap(new String("aaaa").getBytes())); channel.register(selector, SelectionKey.OP_READ); /** 外循环,已经发生了SelectionKey数目 */ while (selector.select() > 0) { System.out.println("有新channel加入"); /* 得到已经被捕获了的SelectionKey的集合 */ Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = null; try { key = (SelectionKey) iterator.next(); iterator.remove(); if (key.isReadable()) { reveice(key); } if (key.isWritable()) { // send(key); } } catch (IOException e) { e.printStackTrace(); try { if (key != null) { key.cancel(); key.channel().close(); } } catch (ClosedChannelException cex) { e.printStackTrace(); } } } /* 内循环完 */ } /* 外循环完 */ } /* * 接收 用receive()读IO 作为服务端一般不需要调用connect(),如果未调用<span style= * "font-family: Arial, Helvetica, sans-serif;">connect()时调<span * style="font-family: Arial, Helvetica, sans-serif;" * >用read()\write()读写,会报java.nio.channels .NotYetConnectedException * 只有调用connect()之后,才能使用read和write. */ synchronized public void reveice(SelectionKey key) throws IOException { if (key == null) return; // ***用channel.receive()获取客户端消息***// // :接收时需要考虑字节长度 DatagramChannel sc = (DatagramChannel) key.channel(); String content = ""; // create buffer with capacity of 48 bytes ByteBuffer buf = ByteBuffer.allocate(1024);// java里一个(utf-8)中文3字节,gbk中文占2个字节 buf.clear(); SocketAddress address = sc.receive(buf); // read into buffer. 返回客户端的地址信息 String clientAddress = address.toString().replace("/", "").split(":")[0]; String clientPost = address.toString().replace("/", "").split(":")[1]; buf.flip(); // make buffer ready for read while (buf.hasRemaining()) { buf.get(new byte[buf.limit()]);// read 1 byte at a time content += new String(buf.array()); } buf.clear(); // make buffer ready for writing System.out.println("接收:" + content.trim()); // 第一次发;udp采用数据报模式,发送多少次,接收多少次 ByteBuffer buf2 = ByteBuffer.allocate(65507); buf2.clear(); buf2.put( "消息推送内容 abc..UDP是一个非连接的协议,传输数据之前源端和终端不建立连接,当它想传送时就简单地去抓取来自应用程序的数据,并尽可能快地把它扔到网络上。在发送端UDP是一个非连接的协议,传输数据之前源端和终端不建立连接,当它想传送时就简单地去抓取来自应用程序的数据,并尽可能快地把它扔到网络上。在发送端UDP是一个非连接的协议,传输数据之前源端和终端不建立连接,当它想传送时就简单地去抓取来自应用程序的数据,并尽可能快地把它扔到网络上。在发送端@Q" .getBytes()); buf2.flip(); channel.send(buf2, new InetSocketAddress(clientAddress, Integer.parseInt(clientPost))); // 将消息回送给客户端 // 第二次发 ByteBuffer buf3 = ByteBuffer.allocate(65507); buf3.clear(); buf3.put("任务完成".getBytes()); buf3.flip(); channel.send(buf3, new InetSocketAddress(clientAddress, Integer.parseInt(clientPost))); // 将消息回送给客户端 } int y = 0; public void send(SelectionKey key) { if (key == null) return; // ByteBuffer buff = (ByteBuffer) key.attachment(); DatagramChannel sc = (DatagramChannel) key.channel(); try { sc.write(ByteBuffer.wrap(new String("aaaa").getBytes())); } catch (IOException e1) { e1.printStackTrace(); } System.out.println("send2() " + (++y)); } /* 发送文件 */ public void sendFile(SelectionKey key) { if (key == null) return; ByteBuffer buff = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); String data = decode(buff); if (data.indexOf("get") == -1) return; String subStr = data.substring(data.indexOf(" "), data.length()); System.out.println("截取之后的字符串是 " + subStr); FileInputStream fileInput = null; try { fileInput = new FileInputStream(subStr); FileChannel fileChannel = fileInput.getChannel(); fileChannel.transferTo(0, fileChannel.size(), sc); fileChannel.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { fileInput.close(); } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) throws IOException { new DatagramChannelServerDemo().service(); } }
package com.dxz.springsession.nio.demo3; import java.nio.channels.*; import java.nio.charset.*; import java.net.*; import java.io.*; import java.util.*; import java.nio.*; public class DatagramChannelClientDemo { // UDP协议客户端 private String serverIp = ""; private int port = 9975; // private ServerSocketChannel serverSocketChannel; DatagramChannel channel; private Charset charset = Charset.forName("UTF-8"); private Selector selector = null; public DatagramChannelClientDemo() throws IOException { try { selector = Selector.open(); channel = DatagramChannel.open(); } catch (Exception e) { selector = null; channel = null; System.out.println("超时"); } System.out.println("客户器启动"); } /* 编码过程 */ public ByteBuffer encode(String str) { return charset.encode(str); } /* 解码过程 */ public String decode(ByteBuffer bb) { return charset.decode(bb).toString(); } /* 服务器服务方法 */ public void service() throws IOException { if (channel == null || selector == null) return; channel.configureBlocking(false); channel.connect(new InetSocketAddress(serverIp, port));// 连接服务端 channel.write(ByteBuffer.wrap(new String("客户端请求获取消息").getBytes())); channel.register(selector, SelectionKey.OP_READ); /** 外循环,已经发生了SelectionKey数目 */ while (selector.select() > 0) { /* 得到已经被捕获了的SelectionKey的集合 */ Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = null; try { key = (SelectionKey) iterator.next(); iterator.remove(); if (key.isReadable()) { reveice(key); } if (key.isWritable()) { // send(key); } } catch (IOException e) { e.printStackTrace(); try { if (key != null) { key.cancel(); key.channel().close(); } } catch (ClosedChannelException cex) { e.printStackTrace(); } } } /* 内循环完 */ } /* 外循环完 */ } // /* // * 接收 用read()读IO // * */ // synchronized public void reveice2(SelectionKey key) throws IOException { // if (key == null) // return; // // ***用channel.read()获取消息***// // // :接收时需要考虑字节长度 // DatagramChannel sc = (DatagramChannel) key.channel(); // String content = ""; // // create buffer with capacity of 48 bytes // ByteBuffer buf = ByteBuffer.allocate(3);// java里一个(utf-8)中文3字节,gbk中文占2个字节 // int bytesRead = sc.read(buf); //read into buffer. // // while (bytesRead >0) { // buf.flip(); //make buffer ready for read // while(buf.hasRemaining()){ // buf.get(new byte[buf.limit()]); // read 1 byte at a time // content += new String(buf.array()); // } // buf.clear(); //make buffer ready for writing // bytesRead = sc.read(buf); // } // System.out.println("接收:" + content); // } /* 接收 */ synchronized public void reveice(SelectionKey key) throws IOException { String threadName = Thread.currentThread().getName(); if (key == null) return; try { // ***用channel.receive()获取消息***// // :接收时需要考虑字节长度 DatagramChannel sc = (DatagramChannel) key.channel(); String content = ""; // 第一次接;udp采用数据报模式,发送多少次,接收多少次 ByteBuffer buf = ByteBuffer.allocate(65507);// java里一个(utf-8)中文3字节,gbk中文占2个字节 buf.clear(); SocketAddress address = sc.receive(buf); // read into buffer. String clientAddress = address.toString().replace("/", "").split(":")[0]; String clientPost = address.toString().replace("/", "").split(":")[1]; System.out.println(threadName + "\t" + address.toString()); buf.flip(); // make buffer ready for read while (buf.hasRemaining()) { buf.get(new byte[buf.limit()]);// read 1 byte at a time byte[] tmp = buf.array(); content += new String(tmp); } buf.clear(); // make buffer ready for writing次 System.out.println(threadName + "接收:" + content.trim()); // 第二次接 content = ""; ByteBuffer buf2 = ByteBuffer.allocate(65507);// java里一个(utf-8)中文3字节,gbk中文占2个字节 buf2.clear(); SocketAddress address2 = sc.receive(buf2); // read into buffer. buf2.flip(); // make buffer ready for read while (buf2.hasRemaining()) { buf2.get(new byte[buf2.limit()]);// read 1 byte at a time byte[] tmp = buf2.array(); content += new String(tmp); } buf2.clear(); // make buffer ready for writing次 System.out.println(threadName + "接收2:" + content.trim()); } catch (PortUnreachableException ex) { System.out.println(threadName + "服务端端口未找到!"); } send(2); } boolean flag = false; public void send(int i) { if (flag) return; try { // channel.write(ByteBuffer.wrap(new // String("客户端请求获取消息(第"+i+"次)").getBytes())); // channel.register(selector, SelectionKey.OP_READ ); ByteBuffer buf2 = ByteBuffer.allocate(48); buf2.clear(); buf2.put(("客户端请求获取消息(第" + i + "次)").getBytes()); buf2.flip(); channel.write(buf2); channel.register(selector, SelectionKey.OP_READ); // int bytesSent = channel.send(buf2, new // InetSocketAddress(serverIp,port)); // 将消息回送给服务端 } catch (IOException e) { e.printStackTrace(); } flag = true; } int y = 0; public void send(SelectionKey key) { if (key == null) return; // ByteBuffer buff = (ByteBuffer) key.attachment(); DatagramChannel sc = (DatagramChannel) key.channel(); try { sc.write(ByteBuffer.wrap(new String("aaaa").getBytes())); } catch (IOException e1) { e1.printStackTrace(); } System.out.println("send2() " + (++y)); } /* 发送文件 */ public void sendFile(SelectionKey key) { if (key == null) return; ByteBuffer buff = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); String data = decode(buff); if (data.indexOf("get") == -1) return; String subStr = data.substring(data.indexOf(" "), data.length()); System.out.println("截取之后的字符串是 " + subStr); FileInputStream fileInput = null; try { fileInput = new FileInputStream(subStr); FileChannel fileChannel = fileInput.getChannel(); fileChannel.transferTo(0, fileChannel.size(), sc); fileChannel.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { fileInput.close(); } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) throws IOException { new Thread(new Runnable() { public void run() { try { new DatagramChannelClientDemo().service(); } catch (IOException e) { e.printStackTrace(); } } }).start(); // new Thread(new Runnable() { // public void run() { // try { // new DatagramChannelClientDemo().service(); // } catch (IOException e) { // e.printStackTrace(); // } // } // }).start(); } }