手动搭建I/O网络通信框架4:AIO编程模型,聊天室终极改造
转载自 https://www.cnblogs.com/lbhym/p/12720944.html
前言
上一章讲到的NIO编程模型比较主流,非常著名的Netty就是基于NIO编程模型的。这一章说的是AIO编程模型,是异步非阻塞的。虽然同样实现的是聊天室功能,但是实现逻辑上稍微要比NIO和BIO复杂一点。不过理好整体脉络,会好理解一些。首先还是讲讲概念:
BIO和NIO的区别是阻塞和非阻塞,而AIO代表的是异步IO。在此之前只提到了阻塞和非阻塞,没有提到异步还是同步。可以用我在知乎上看到的一句话表示:【在处理 IO 的时候,阻塞和非阻塞都是同步 IO,只有使用了特殊的 API 才是异步 IO】。这些“特殊的API”下面会讲到。在说AIO之前,先总结一下阻塞非阻塞、异步同步的概念。
阻塞和非阻塞,描述的是结果的请求。阻塞:在得到结果之前就一直呆在那,啥也不干,此时线程挂起,就如其名,线程被阻塞了。非阻塞:如果没得到结果就返回,等一会再去请求,直到得到结果为止。异步和同步,描述的是结果的发出,当调用方的请求进来。同步:在没获取到结果前就不返回给调用方,如果调用方是阻塞的,那么调用方就会一直等着。如果调用方是非阻塞的,调用方就会先回去,等一会再来问问得到结果没。异步:调用方一来,会直接返回,等执行完实际的逻辑后在通过回调函数把结果返回给调用方。
AIO中的异步操作
CompletionHandler
在AIO编程模型中,常用的API,如connect、accept、read、write都是支持异步操作的。当调用这些方法时,可以携带一个CompletionHandler参数,它会提供一些回调函数。这些回调函数包括:
- 当这些操作成功时你需要怎么做;
- 如果这些操作失败了你要这么做。
关于这个CompletionHandler参数,你只需要写一个类实现CompletionHandler口,并实现里面两个方法就行了。
那如何在调用connect、accept、read、write这四个方法时,传入CompletionHandler参数从而实现异步呢?下面分别举例这四个方法的使用。
先说说Socket
和ServerSocket
,在NIO中,它们变成了通道,配合缓冲区,从而实现了非阻塞。而在AIO中它们变成了异步通道。也就是AsynchronousServerSocketChannel
和AsynchronousSocketChannel
,下面例子中对象名分别是serverSocket和socket.
accept:serverSocket.accept(attachment,handler)。
handler就是实现了CompletionHandler接口并实现两个回调函数的类,它具体怎么写可以看下面的实战代码。attachment为handler里面可能需要用到的辅助数据,如果没有就填null。
read:socket.read(buffer,attachment,handler)。
buffer是缓冲区,用以存放读取到的信息。后面两个参数和accept一样。
write:socket.write(buffer,attachment,handler)。
和read参数一样。
connect:socket.connect(address,attachment,handler)。
address为服务器的IP和端口,后面两个参数与前几个一样。

Future
既然说到了异步操作,除了使用实现CompletionHandler接口的方式,不得不想到Future。客户端逻辑较为简单,如果使用CompletionHandler的话代码反而更复杂,所以下面的实战客户端代码就会使用Future的方式。简单来说,Future表示的是异步操作未来的结果,怎么理解未来。比如,客户端调用read方法获取服务器发来得消息:
1
| Future<Integer> readResult=clientChannel.read(buffer)
|
Integer是read()的返回类型,此时变量readResult实际上并不一定有数据,而是表示read()方法未来的结果,这时候readResult有两个方法,isDone():返回boolean,查看程序是否完成处理,如果返回true,有结果了,这时候可以通过get()获取结果。如果你不事先判断isDone()直接调用get()也行,只不过它是阻塞的。如果你不想阻塞,想在这期间做点什么,就用isDone()。
还有一个问题:这些handler的方法是在哪个线程执行的?serverSocket.accept这个方法肯定是在主线程里面调用的,而传入的这些回调方法其实是在其他线程执行的。在AIO中,会有一个AsynchronousChannelGroup,它和AsynchronousServerSocketChannel是绑定在一起的,它会为这些异步通道提供系统资源,线程就算其中一种系统资源,所以为了方便理解,我们暂时可以把他看作一个线程池,它会为这些handler分配线程,而不是在主线程中去执行。
AIO编程模型

上面只说了些零碎的概念,为了更好的理解,下面讲一讲大概的工作流程(主要针对服务器,客户端逻辑较为简单,代码注释也比较少,可以看前面几章):
- 首先做准备工作。跟NIO一样,先要创建好通道,只不过AIO是异步通道。然后创建好AsyncChannelGroup,可以选择自定义线程池。最后把AsyncServerSocket和AsyncChannelGroup绑定在一起,这样处于同一个AsyncChannelGroup里的通道就可以共享系统资源。
- 最后一步准备工作,创建好handler类,并实现接口和里面两个回调方法。(如图:客户端1对应的handler,里面的回调方法会实现读取消息和转发消息的功能;serverSocket的handler里的回调方法会实现accept功能。)
- 准备工作完成,当客户端1连接请求进来,客户端会马上回去,ServerSocket的异步方法会在连接成功后把客户端的SocketChannel存进在线用户列表,并利用客户端1的handler开始异步监听客户端1发送的消息。
- 当客户端1发送消息时,如果上一步中的handler成功监听到,就会回调成功后的回调方法,这个方法里会把这个消息转发给其他客户端。转发完成后,接着利用handler监听客户端1发送的消息。
代码一共有三个类:
- ChatServer:功能基本上和上面讲的工作流程差不多,还会有一些工具方法,都比较简单,就不多说了,如:转发消息,客户端下线后从在线列表移除客户端等。
- ChatClient:基本和前两章的BIO、NIO没什么区别,一个线程监听用户输入信息并发送,主线程异步的读取服务器信息。
- UserInputHandler:监听用户输入信息的线程。
ChatServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| public class ChatServer { private static final int BUFFER = 1024;
private AsynchronousServerSocketChannel serverSocketChannel; private AsynchronousChannelGroup channelGroup;
private CopyOnWriteArrayList<ClientHandler> clientHandlerList; private Charset charset = Charset.forName("UTF-8");
private int port; public ChatServer(int port) { this.port = port; clientHandlerList=new CopyOnWriteArrayList<>(); }
public void start() { try {
ExecutorService executorService = Executors.newFixedThreadPool(10); channelGroup = AsynchronousChannelGroup.withThreadPool(executorService); serverSocketChannel=AsynchronousServerSocketChannel.open(channelGroup); serverSocketChannel.bind(new InetSocketAddress("127.0.0.1",port)); System.out.println("服务器启动:端口【"+port+"】");
while (true) { serverSocketChannel.accept(null, new AcceptHandler()); System.in.read(); } } catch (IOException e) { e.printStackTrace(); }finally { if(serverSocketChannel!=null){ try { serverSocketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } }
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{ @Override public void completed(AsynchronousSocketChannel clientChannel, Object attachment) { if(serverSocketChannel.isOpen()){ serverSocketChannel.accept(null,this); } if(clientChannel!=null&&clientChannel.isOpen()){ ClientHandler handler=new ClientHandler(clientChannel); clientHandlerList.add(handler); System.out.println(getPort(clientChannel)+"上线啦!"); ByteBuffer buffer=ByteBuffer.allocate(BUFFER); clientChannel.read(buffer,buffer,handler); } } @Override public void failed(Throwable exc, Object attachment) { System.out.println("连接失败"+exc); } }
private class ClientHandler implements CompletionHandler<Integer, ByteBuffer>{ private AsynchronousSocketChannel clientChannel; public ClientHandler(AsynchronousSocketChannel clientChannel) { this.clientChannel = clientChannel; } @Override public void completed(Integer result, ByteBuffer buffer) { if(buffer!=null){ if(result<=0){ removeClient(this); }else { buffer.flip(); String msg=String.valueOf(charset.decode(buffer)); System.out.println(getPort(clientChannel)+msg); sendMessage(clientChannel,getPort(clientChannel)+msg); buffer=ByteBuffer.allocate(BUFFER);
if(msg.equals("quit")) removeClient(this); else clientChannel.read(buffer, buffer, this); } } }
@Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("客户端读写异常:"+exc); } }
private void sendMessage(AsynchronousSocketChannel clientChannel,String msg){ for(ClientHandler handler:clientHandlerList){ if(!handler.clientChannel.equals(clientChannel)){ ByteBuffer buffer=charset.encode(msg); handler.clientChannel.write(buffer,null,handler); } } } private String getPort(AsynchronousSocketChannel clientChannel){ try { InetSocketAddress address=(InetSocketAddress)clientChannel.getRemoteAddress(); return "客户端["+address.getPort()+"]:"; } catch (IOException e) { e.printStackTrace(); return "客户端[Undefined]:"; } } private void removeClient(ClientHandler handler){ clientHandlerList.remove(handler); System.out.println(getPort(handler.clientChannel)+"断开连接..."); if(handler.clientChannel!=null){ try { handler.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }
public static void main(String[] args) { new ChatServer(8888).start(); } }
|
ChatClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| public class ChatClient { private static final int BUFFER = 1024; private AsynchronousSocketChannel clientChannel; private Charset charset = Charset.forName("UTF-8");
private String host; private int port; public ChatClient(String host, int port) { this.host = host; this.port = port; }
public void start() { try { clientChannel = AsynchronousSocketChannel.open(); Future<Void> future = clientChannel.connect(new InetSocketAddress(host, port)); future.get(); new Thread(new UserInputHandler(this)).start(); ByteBuffer buffer=ByteBuffer.allocate(BUFFER); while (true){ Future<Integer> read=clientChannel.read(buffer); if(read.get()>0){ buffer.flip(); String msg=String.valueOf(charset.decode(buffer)); System.out.println(msg); buffer.clear(); }else { System.out.println("服务器断开连接"); if(clientChannel!=null){ clientChannel.close(); } System.exit(-1); } } } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } }
public void send(String msg) { if (msg.isEmpty()) return; ByteBuffer buffer = charset.encode(msg); Future<Integer> write=clientChannel.write(buffer); try { write.get(); } catch (ExecutionException|InterruptedException e) { System.out.println("消息发送失败"); e.printStackTrace(); } }
public static void main(String[] args) { new ChatClient("127.0.0.1",8888).start(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class UserInputHandler implements Runnable { ChatClient client; public UserInputHandler(ChatClient chatClient) { this.client=chatClient; } @Override public void run() { BufferedReader read=new BufferedReader( new InputStreamReader(System.in) ); while (true){ try { String input=read.readLine(); client.send(input); if(input.equals("quit")) break; } catch (IOException e) { e.printStackTrace(); } } } }
|
运行测试:


