手动搭建I/O网络通信框架3:NIO编程模型,升级改造聊天室
转载自 https://www.cnblogs.com/lbhym/p/12698309.html
手动搭建I/O网络通信框架3:NIO编程模型,升级改造聊天室
在“手动搭建I/O网络通信框架2:BIO编程模型实现群聊”中用BIO编程模型,简单的实现了一个聊天室。但是其最大的问题在解释BIO时就已经说了:ServerSocket接收请求时(accept()方法)、InputStream、OutputStream(输入输出流的读和写)都是阻塞的。还有一个问题就是线程池,线程多了,服务器性能耗不起。线程少了,在聊天室这种场景下,让用户等待连接肯定不可取。今天要说到的NIO编程模型就很好的解决了这几个问题。有两个主要的替换地方:
- 用Channel代替Stream。
- 使用Selector监控多条Channel,起到类似线程池的作用,但是它只需一条线程。
既然要用NIO编程模型,那就要说说它的三个主要核心:Selector、Channel、Buffer。它们的关系是:一个Selector管理多个Channel,一个Channel可以往Buffer中写入和读取数据。Buffer名叫缓冲区,底层其实是一个数组,会提供一些方法往数组写入读取数据。
Buffer:
不太了解Buffer的可以看看这个:https://blog.csdn.net/czx2018/article/details/89502699
常用API:
- allocate() - 初始化一块缓冲区
- put() - 向缓冲区写入数据
- get() - 向缓冲区读数据
- filp() - 将缓冲区的读写模式转换
- clear() - 这个并不是把缓冲区里的数据清除,而是利用后来写入的数据来覆盖原来写入的数据,以达到类似清除了老的数据的效果
- compact() - 从读数据切换到写模式,数据不会被清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据
- mark() - 对position做出标记,配合reset使用
- reset() - 将position置为标记值
简单地说:Buffer实质上是个数组,有两个关键的指针,一个position代表当前数据写入到哪了、一个limit代表限制。初始化时设置了数组长度,这limit就是数组的长度。如:设置intBuffer.allocate(10)
,最大存储10个int数据,写入5五个数据后,需要读取数据了。用filp()转换读写模式后,limit=position,position=0。也就是说从0开始读,只能读到第五个。读完后这个缓冲区就需要clear()了,实际上并没有真的去清空数据,而是position和limit两个指针又回到了初始化的位置,接着又可以写入数据了,反正数组下标相同重新写入数据会覆盖,就没必要真的去清空了。
Channel:
Channel(通道)主要用于传输数据,然后从Buffer中写入或读取。它们两个结合起来虽然和流有些相似,但主要有以下几点区别:
- 流是单向的,可以发现Stream的输入流和输出流是独立的,它们只能输入或输出。而通道既可以读也可以写。
- 通道本身不能存放数据,只能借助Buffer。
- Channel支持异步。
Channel有如下三个常用的类:FileChannel、SocketChannel、ServerSocketChannel。从名字也可以看出区别,第一个是对文件数据的读写,后面两个则是针对Socket和ServerSocket,这里我们只是用后面两个。更详细的用法可以看:https://www.cnblogs.com/snailclimb/p/9086335.html,下面的代码中也会用到,会有详细的注释。
Selector

多个Channel可以注册到Selector,就可以直接通过一个Selector管理多个通道。Channel在不同的时间或者不同的事件下有不同的状态,Selector会通过轮询来达到监视的效果,如果查到Channel的状态正好是我们注册时声明的所要监视的状态,我们就可以查出这些通道,然后做相应的处理。这些状态如下: 1.客户端的SocketChannel和服务器端建立连接,SocketChannel状态就是Connect。 2.服务器端的ServerSocketChannel接收了客户端的请求,ServerSocketChannel状态就是Accept。 3.当SocketChannel有数据可读,那么它们的状态就是Read。 4.当我们需要向Channel中写数据时,那么它们的状态就是Write。
具体的使用见下面代码注释或看 Java NIO之Selector(选择器)
NIO编程模型
NIO编程模型工作流程:
- 首先会创建一个Selector,用来监视管理各个不同的Channel,也就是不同的客户端。相当于取代了原来BIO的线程池,但是它只需一个线程就可以处理多个Channel,没有了线程上下文切换带来的消耗,很好的优化了性能。
- 创建一个ServerSocketChannel监听通信端口,并注册到Selector,让Seletor监视这个通道的Accept状态,也就是接收客户端请求的状态。
- 此时客户端ClientA请求服务器,那么Selector就知道了有客户端请求进来。这时候我们可以得到客户端的SocketChannel,并为这个通道注册Read状态,也就是Selector会监听ClientA发来的消息。
- 一旦接收到ClientA的消息,就会用其他客户端的SocketChannel的Write状态,向它们转发ClientA的消息。
上代码之前,还是先说说各个类的作用:
相比较BIO的代码,NIO的代码还少了一个类,那就是服务器端的工作线程类。没了线程池,自然也不需要一个单独的线程去服务客户端。客户端还是需要一个单独的线程去等待用户输入,因为用户随时都可能输入信息,这个没法预见,只能阻塞式的等待。
- ChatServer:服务器端的唯一的类,作用就是通过Selector监听Read和Accept事件,并针对这些事件的类型,进行不同的处理,如连接、转发。
- ChatClient:客户端,通过Selector监听Read和Connect事件。Read事件就是获取服务器转发的消息然后显示出来;Connect事件就是和服务器建立连接,建立成功后就可以发送消息。
- UserInputHandler:专门等待用户输入的线程,和BIO没区别。
ChatServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| public class ChatServer { private static final int BUFFER = 1024;
private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER); private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
private int port;
public ChatServer(int port) { this.port = port; }
private void start() { try (ServerSocketChannel server = ServerSocketChannel.open(); Selector selector = Selector.open()) { server.configureBlocking(false); server.socket().bind(new InetSocketAddress(port));
server.register(selector, SelectionKey.OP_ACCEPT); System.out.println("启动服务器,监听端口:" + port);
while (true) { if(selector.select()>0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { handles(key, selector); } selectionKeys.clear(); } } } catch (IOException e) { e.printStackTrace(); } }
private void handles(SelectionKey key, Selector selector) throws IOException { if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ); System.out.println("客户端[" + client.socket().getPort() + "]上线啦!"); } if (key.isReadable()) { SocketChannel client = (SocketChannel) key.channel(); String msg = receive(client); System.out.println("客户端[" + client.socket().getPort() + "]:" + msg); sendMessage(client, msg, selector); if (msg.equals("quit")) { key.cancel(); selector.wakeup(); System.out.println("客户端[" + client.socket().getPort() + "]下线了!"); } } }
private Charset charset = Charset.forName("UTF-8");
private String receive(SocketChannel client) throws IOException { read_buffer.clear(); while (client.read(read_buffer) > 0) ; read_buffer.flip(); return String.valueOf(charset.decode(read_buffer)); }
private void sendMessage(SocketChannel client, String msg, Selector selector) throws IOException { msg = "客户端[" + client.socket().getPort() + "]:" + msg; for (SelectionKey key : selector.keys()) { if (!(key.channel() instanceof ServerSocketChannel) && !client.equals(key.channel()) && key.isValid()) { SocketChannel otherClient = (SocketChannel) key.channel(); write_buffer.clear(); write_buffer.put(charset.encode(msg)); write_buffer.flip(); while (write_buffer.hasRemaining()) { otherClient.write(write_buffer); } } } }
public static void main(String[] args) { new ChatServer(8888).start(); } }
|
ChatClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| public class ChatClient { private static final int BUFFER = 1024; private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER); private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER); private SocketChannel client; private Selector selector;
private Charset charset = Charset.forName("UTF-8");
private void start() { try { client=SocketChannel.open(); selector=Selector.open(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_CONNECT); client.connect(new InetSocketAddress("127.0.0.1", 8888)); while (true) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { handle(key); } selectionKeys.clear(); } } catch (IOException e) { e.printStackTrace(); }catch (ClosedSelectorException e){ } }
private void handle(SelectionKey key) throws IOException { if (key.isConnectable()) { SocketChannel client = (SocketChannel) key.channel(); if(client.finishConnect()){ new Thread(new UserInputHandler(this)).start(); } client.register(selector,SelectionKey.OP_READ); } if(key.isReadable()){ SocketChannel client = (SocketChannel) key.channel(); String msg = receive(client); System.out.println(msg); if (msg.equals("quit")) { key.cancel(); selector.wakeup(); } } } private String receive(SocketChannel client) throws IOException{ read_buffer.clear(); while (client.read(read_buffer)>0); read_buffer.flip(); return String.valueOf(charset.decode(read_buffer)); }
public void send(String msg) throws IOException{ if(!msg.isEmpty()){ write_buffer.clear(); write_buffer.put(charset.encode(msg)); write_buffer.flip(); while (write_buffer.hasRemaining()){ client.write(write_buffer); } if(msg.equals("quit")){ selector.close(); } } }
public static void main(String[] args) { new ChatClient().start(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class UserInputHandler implements Runnable { ChatClient client; public UserInputHandler(ChatClient chatClient) { this.client=chatClient; } @Override public void run() { BufferedReader read=new BufferedReader( new InputStreamReader(System.in) ); while (true){ try { String input=read.readLine(); client.send(input); if(input.equals("quit")) break; } catch (IOException e) { e.printStackTrace(); } } } }
|
测试运行:之前用的是win10的终端运行的,以后直接用IDEA运行,方便些。不过一个类同时运行多个,以实现多个客户端的场景,需要先做以下设置




设置完后,就可以同时运行两个ChatClient了,上图中得Unnamed就是第二个ChatClient,选中后点击右边运行按钮就行了。效果如下:


