0%

什么是MyCat

MyCat 是什么?从定义和分类来看,它是一个开源的分布式数据库系统,前端的用户可以把它看成一个数据库代理,用MySql客户端和命令行工具都可以访问,而其后端则是用MySql原生的协议与多个MySql服务之间进行通信。MyCat的核心功能是分库分表,即将一个大表水平切分成N个小表,然后存放在后端的MySql数据当中。

MyCat发展到目前的版本,已经不是一个单纯的MySql代理了,它的后端支持MySql,Oracle,SqlServer,DB2等主流的数据库,也支持MongoDB这种NoSql数据库。而对于前端的用户来说,无论后端采用哪一种数据库,在MyCat里都是一个传统的数据库,支持标准的SQL语句,对于前端的开发人员来说,可以大大地降低开发难度,提升开发速度。

阅读全文 »

背景

在应用系统发展的初期,我们并不知道以后会发展成什么样的规模,所以一开始不会考虑复杂的系统架构,复杂的系统架构费时费力,开发周期长,与系统发展初期这样的一个定位是不吻合的。所以,我们都会采用简单的架构,随着业务不断的发展,访问量不断升高,我们再对系统进行架构方面的优化。

阅读全文 »

Java NIO之Selector(选择器)

转载自https://www.cnblogs.com/snailclimb/p/9086334.html

一 Selector(选择器)介绍

Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。

https://user-gold-cdn.xitu.io/2018/5/15/16363f5338f36c54?w=636&h=260&f=png&s=23373

使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。

二 Selector(选择器)的使用方法介绍

1. Selector的创建

通过调用Selector.open()方法创建一个Selector对象,如下:

1
Selector selector = Selector.open();

这里需要说明一下

2. 注册Channel到Selector

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);

Channel必须是非阻塞的。 所以FileChannel不适用Selector,因为FileChannel不能切换为非阻塞模式,更准确的来说是因为FileChannel没有继承SelectableChannel。Socket channel可以正常使用。

SelectableChannel抽象类 有一个 configureBlocking() 方法用于使通道处于阻塞模式或非阻塞模式。

1
abstract SelectableChannel configureBlocking(boolean block)

注意:

SelectableChannel抽象类configureBlocking() 方法是由 AbstractSelectableChannel抽象类实现的,SocketChannel、ServerSocketChannel、DatagramChannel都是直接继承了 AbstractSelectableChannel抽象类 。 

register() 方法的第二个参数。这是一个“ interest集合 ”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:

  • Connect
  • Accept
  • Read
  • Write

通道触发了一个事件意思是该事件已经就绪。比如某个Channel成功连接到另一个服务器称为“ 连接就绪 ”。一个Server Socket Channel准备好接收新进入的连接称为“ 接收就绪 ”。一个有数据可读的通道可以说是“ 读就绪 ”。等待写数据的通道可以说是“ 写就绪 ”。

这四种事件用SelectionKey的四个常量来表示:

1
2
3
4
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE

如果你对不止一种事件感兴趣,使用或运算符即可,如下:

1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

3. SelectionKey介绍

一个SelectionKey键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。

1
2
3
4
5
key.attachment(); //返回SelectionKey的attachment,attachment可以在注册channel的时候指定。
key.channel(); // 返回该SelectionKey对应的channel。
key.selector(); // 返回该SelectionKey对应的Selector。
key.interestOps(); //返回代表需要Selector监控的IO操作的bit mask
key.readyOps(); // 返回一个bit mask,代表在相应channel上可以进行的IO操作。

key.interestOps():

我们可以通过以下方法来判断Selector是否对Channel的某种事件感兴趣

1
2
3
4
5
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

key.readyOps()

ready 集合是通道已经准备就绪的操作的集合。JAVA中定义以下几个方法用来检查这些操作是否就绪.

1
2
3
//创建ready集合的方法int readySet = selectionKey.readyOps();
//检查这些操作是否就绪的方法
key.isAcceptable();//是否可读,是返回 trueboolean isWritable()://是否可写,是返回 trueboolean isConnectable()://是否可连接,是返回 trueboolean isAcceptable()://是否可接收,是返回 true

从SelectionKey访问Channel和Selector很简单。如下:

1
2
3
Channel channel = key.channel();
Selector selector = key.selector();
key.attachment();

可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:

1
2
key.attach(theObject);
Object attachedObj = key.attachment();

还可以在用register()方法向Selector注册Channel的时候附加对象。如:

1
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

4. 从Selector中选择channel(Selecting Channels via a Selector)

选择器维护注册过的通道的集合,并且这种注册关系都被封装在SelectionKey当中.

Selector维护的三种类型SelectionKey集合:

  • 已注册的键的集合(Registered key set)

    所有与选择器关联的通道所生成的键的集合称为已经注册的键的集合。并不是所有注册过的键都仍然有效。这个集合通过 keys() 方法返回,并且可能是空的。这个已注册的键的集合不是可以直接修改的;试图这么做的话将引发java.lang.UnsupportedOperationException。

  • 已选择的键的集合(Selected key set)

    所有与选择器关联的通道所生成的键的集合称为已经注册的键的集合。并不是所有注册过的键都仍然有效。这个集合通过 keys() 方法返回,并且可能是空的。这个已注册的键的集合不是可以直接修改的;试图这么做的话将引发java.lang.UnsupportedOperationException。

  • 已取消的键的集合(Cancelled key set)

    已注册的键的集合的子集,这个集合包含了 cancel() 方法被调用过的键(这个键已经被无效化),但它们还没有被注销。这个集合是选择器对象的私有成员,因而无法直接访问。

    注意: 当键被取消( 可以通过isValid( ) 方法来判断)时,它将被放在相关的选择器的已取消的键的集合里。注册不会立即被取消,但键会立即失效。当再次调用 select( ) 方法时(或者一个正在进行的select()调用结束时),已取消的键的集合中的被取消的键将被清理掉,并且相应的注销也将完成。通道会被注销,而新的SelectionKey将被返回。当通道关闭时,所有相关的键会自动取消(记住,一个通道可以被注册到多个选择器上)。当选择器关闭时,所有被注册到该选择器的通道都将被注销,并且相关的键将立即被无效化(取消)。一旦键被无效化,调用它的与选择相关的方法就将抛出CancelledKeyException。

select()方法介绍:

在刚初始化的Selector对象中,这三个集合都是空的。 通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。下面是Selector几个重载的select()方法:

  • int select():阻塞到至少有一个通道在你注册的事件上就绪了。
  • int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。
  • int selectNow():非阻塞,只要有通道就绪就立刻返回。

select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

一旦调用select()方法,并且返回值不为0时,则 可以通过调用Selector的selectedKeys()方法来访问已选择键集合 。如下:  Set selectedKeys=selector.selectedKeys();  进而可以放到和某SelectionKey关联的Selector和Channel。如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}

5. 停止选择的方法

选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有以下三种方式可以唤醒在select()方法中阻塞的线程。

  • wakeup()方法 :通过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回 该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。
  • close()方法 :通过close()方法关闭Selector, 该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是Channel本身并不会关闭。

三 模板代码

一个服务端的模板代码:

有了模板代码我们在编写程序时,大多数时间都是在模板代码中添加相应的业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("localhost", 8080));
ssc.configureBlocking(false);

Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

while(true) {
int readyNum = selector.select();
if (readyNum == 0) {
continue;
}

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();

while(it.hasNext()) {
SelectionKey key = it.next();

if(key.isAcceptable()) {
// 接受连接
} else if (key.isReadable()) {
// 通道可读
} else if (key.isWritable()) {
// 通道可写
}

it.remove();
}
}

四 客户端与服务端简单交互实例

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class WebServer {
public static void main(String[] args) {
try {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8000));
ssc.configureBlocking(false);

Selector selector = Selector.open();
// 注册 channel,并且指定感兴趣的事件是 Accept
ssc.register(selector, SelectionKey.OP_ACCEPT);

ByteBuffer readBuff = ByteBuffer.allocate(1024);
ByteBuffer writeBuff = ByteBuffer.allocate(128);
writeBuff.put("received".getBytes());
writeBuff.flip();

while (true) {
int nReady = selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();

while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();

if (key.isAcceptable()) {
// 创建新的连接,并且把连接注册到selector上,而且,// 声明这个channel只对读操作感兴趣。
SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
readBuff.clear();
socketChannel.read(readBuff);

readBuff.flip();
System.out.println("received : " + new String(readBuff.array()));
key.interestOps(SelectionKey.OP_WRITE);
}
else if (key.isWritable()) {
writeBuff.rewind();
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(writeBuff);
key.interestOps(SelectionKey.OP_READ);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class WebClient {
public static void main(String[] args) throws IOException {
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));

ByteBuffer writeBuffer = ByteBuffer.allocate(32);
ByteBuffer readBuffer = ByteBuffer.allocate(32);

writeBuffer.put("hello".getBytes());
writeBuffer.flip();

while (true) {
writeBuffer.rewind();
socketChannel.write(writeBuffer);
readBuffer.clear();
socketChannel.read(readBuffer);
}
} catch (IOException e) {
}
}
}

运行结果:

先运行服务端,再运行客户端,服务端会不断收到客户端发送过来的消息。

https://user-gold-cdn.xitu.io/2018/5/16/1636720b53ff3a72?w=1090&h=217&f=png&s=15376

手动搭建I/O网络通信框架1:Socket和ServerSocket入门实战,实现单聊

转载自https://www.cnblogs.com/lbhym/p/12673470.html

前言

这个基础项目会作为BIO、NIO、AIO的一个前提,后面会有数篇博客会基于这个小项目利用BIO、NIO、AIO进行改造升级。

简单的说一下io,了解的直接跳过看代码吧:IO常见的使用场景就是网络通信或读取文件等方面。IO流分为字节流和字符流。字节即Byte,包含八位二进制数,一个二进制数就是1bit,中文名称叫位。字符即一个字母或者一个汉字。一个字母由一个字节组成,而汉字根据编码不同由2个或者3个组成。Java.io包如下:详细的API可自行查阅资料

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200410142107890-242008210.png

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200410142126015-790268014.png

Socket定义:套接字(socket)是一个抽象层,应用程序可以通过它发送或接收数据,可对其进行像对文件一样的打开、读写和关闭等操作。套接字允许应用程序将I/O插入到网络中,并与网络中的其他应用程序进行通信。网络套接字是IP地址与端口的组合。

可以理解为两台机器或进程间进行网络通信的端点,这个端点包含IP地址和端口号。

Socket和ServerSocket区别就如其名字一样,简单地说ServerSocket作用在服务端,用以监听客户端的请求。Socket作用在客户端和服务端,用以发送接收消息。但是就像上面说的,它们都要包含一个IP地址和端口号。

Socket和ServerSocket实战:

首先创建一个最普通的Java项目。然后创建两个类,Server和Client。其代码和注释如下,仔细看下注释和代码,还是比较简单的

服务器只能为一个客户端服务,一旦监听到客户端的请求,就会一直被这个客户端占用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Client {
public static void main(String[] args) {
//这是服务端的IP和端口
final String DEFAULT_SERVER_HOST = "127.0.0.1";
final int DEFAULT_SERVER_PORT = 8888;
//创建Socket
try (Socket socket = new Socket(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT)) {
//接收消息
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream())
);
//发送消息
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);
//获取用户输入的消息
BufferedReader userReader = new BufferedReader(
new InputStreamReader(System.in)
);
String msg = null;
//循环的话客户端就可以一直输入消息,不然执行完try catch会自动释放资源,也就是断开连接
while (true) {
String input = userReader.readLine();
//写入客户端要发送的消息。因为服务端用readLine获取消息,其以\n为终点,所以要在消息最后加上\n
writer.write(input + "\n");
writer.flush();
msg = reader.readLine();
System.out.println(msg);
//如果客户端输入quit就可以跳出循环、断开连接了
if(input.equals("quit")){
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Server {
public static void main(String[] args) {
final int DEFAULT_PORT = 8888;
//创建ServerSocket监听8888端口
try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
System.out.println("ServerSocket Start,The Port is:" + DEFAULT_PORT);
while (true) {//不停地监听该端口
//阻塞式的监听,如果没有客户端请求就一直停留在这里
Socket socket = serverSocket.accept();
System.out.println("Client[" + socket.getPort() + "]Online");
//接收消息
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream())
);
//发送消息
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);

String msg = null;
while ((msg = reader.readLine()) != null) {
System.out.println("Client[" + socket.getPort() + "]:" + msg);
//写入服务端要发送的消息
writer.write("Server:" + msg + "\n");
writer.flush();
//如果客户端的消息是quit代表他退出了,并跳出循环,不用再接收他的消息了。如果客户端再次连接就会重新上线
if (msg.equals("quit")) {
System.out.println("Client[" + socket.getPort() + "]:Offline");
break;
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

然后打开两个命令终端,通过javac编译后,一个运行Server代表服务器,一个运行Client代表客户端。

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200410144802569-1725038127.png

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200410144811290-2056832827.png

下一篇 手动搭建I/O网络通信框架2:BIO编程模型实现群聊

手动搭建I/O网络通信框架2:BIO编程模型实现群聊

转载自https://www.cnblogs.com/lbhym/p/12681787.html

手动搭建I/O网络通信框架2:BIO编程模型实现群聊

在第一章中运用Socket和ServerSocket简单的实现了网络通信。这一章,利用BIO编程模型进行升级改造,实现群聊聊天室。

所谓BIO,就是Block IO,阻塞式的IO。这个阻塞主要发生在:ServerSocket接收请求时(accept()方法)、InputStream、OutputStream(输入输出流的读和写)都是阻塞的。这个可以在下面代码的调试中发现,比如在客户端接收服务器消息的输入流处打上断点,除非服务器发来消息,不然断点是一直停在这个地方的。也就是说这个线程在这时间是被阻塞的。

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200411190359629-713089288.png

如图:当一个客户端请求进来时,接收器会为这个客户端分配一个工作线程,这个工作线程专职处理客户端的操作。在上一章中,服务器接收到客户端请求后就跑去专门服务这个客户端了,所以当其他请求进来时,是处理不到的。

看到这个图,很容易就会想到线程池,BIO是一个相对简单的模型,实现它的关键之处也在于线程池。

在上代码之前,先大概说清楚每个类的作用,以免弄混淆。更详细的说明,都写在注释当中。

服务器端:

ChatServer:这个类的作用就像图中的Acceptor。它有两个比较关键的全局变量,一个就是存储在线用户信息的Map,一个就是线程池。这个类会监听端口,接收客户端的请求,然后为客户端分配工作线程。还会提供一些常用的工具方法给每个工作线程调用,比如:发送消息、添加在线用户等。我之前简单用过Netty和WebSocket,这个类看上去就已经和这些框架有点相似了。学习IO编程模型也是为了接下来深入学习Netty做准备。

ChatHandler:这个类就是工作线程的类。在这个项目中,它的工作很简单:把接收到的消息转发给其他客户端,当然还有一些小功能,比如添加\移除在线用户。

客户端:

相较于服务器,客户端的改动较小,主要是把等待用户输入信息这个功能分到其他线程做,不然这个功能会一直阻塞主线程,导致无法接收其他客户端的消息。

ChatClient:客户端启动类,也就是主线程,会通过Socket和服务器连接。也提供了两个工具方法:发送消息和接收消息。

UserInputHandler:专门负责等待用户输入信息的线程,一旦有信息键入,就马上发送给服务器。

首先创建两个包区分一下客户端和服务器,client和server

服务器端ChatServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class ChatServer {
private int DEFAULT_PORT = 8888;
/**
* 创建一个Map存储在线用户的信息。这个map可以统计在线用户、针对这些用户可以转发其他用户发送的消息
* 因为会有多个线程操作这个map,所以为了安全起见用ConcurrentHashMap
* 在这里key就是客户端的端口号,但在实际中肯定不会用端口号区分用户,如果是web的话一般用session。
* value是IO的Writer,用以存储客户端发送的消息
*/
private Map<Integer, Writer> map=new ConcurrentHashMap<>();
/**
* 创建线程池,线程上限为10个,如果第11个客户端请求进来,服务器会接收但是不会去分配线程处理它。
* 前10个客户端的聊天记录,它看不见。当有一个客户端下线时,这第11个客户端就会被分配线程,服务器显示在线
* 大家可以把10再设置小一点,测试看看
* */
private ExecutorService executorService= Executors.newFixedThreadPool(10);
//客户端连接时往map添加客户端
public void addClient(Socket socket) throws IOException {
if (socket != null) {
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);
map.put(socket.getPort(), writer);
System.out.println("Client["+socket.getPort()+"]:Online");
}
}

//断开连接时map里移除客户端
public void removeClient(Socket socket) throws Exception {
if (socket != null) {
if (map.containsKey(socket.getPort())) {
map.get(socket.getPort()).close();
map.remove(socket.getPort());
}
System.out.println("Client[" + socket.getPort() + "]Offline");
}
}

//转发客户端消息,这个方法就是把消息发送给在线的其他的所有客户端
public void sendMessage(Socket socket, String msg) throws IOException {
//遍历在线客户端
for (Integer port : map.keySet()) {
//发送给在线的其他客户端
if (port != socket.getPort()) {
Writer writer = map.get(port);
writer.write(msg);
writer.flush();
}
}
}

//接收客户端请求,并分配Handler去处理请求
public void start() {
try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
System.out.println("Server Start,The Port is:"+DEFAULT_PORT);
while (true){
//等待客户端连接
Socket socket=serverSocket.accept();
//为客户端分配一个ChatHandler线程
executorService.execute(new ChatHandler(this,socket));
}
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
ChatServer server=new ChatServer();
server.start();
}
}

服务器端ChatHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ChatHandler implements Runnable {
private ChatServer server;
private Socket socket;

//构造函数,ChatServer通过这个分配Handler线程
public ChatHandler(ChatServer server, Socket socket) {
this.server = server;
this.socket = socket;
}

@Override
public void run() {
try {
//往map里添加这个客户端
server.addClient(socket);
//读取这个客户端发送的消息
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream())
);
String msg = null;
while ((msg = reader.readLine()) != null) {
//这样拼接是为了让其他客户端也能看清是谁发送的消息
String sendmsg = "Client[" + socket.getPort() + "]:" + msg;
//服务器打印这个消息
System.out.println(sendmsg);
//将收到的消息转发给其他在线客户端
server.sendMessage(socket, sendmsg + "\n");
if (msg.equals("quit")) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//如果用户退出或者发生异常,就在map中移除该客户端
try {
server.removeClient(socket);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

客户端ChatClient:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class ChatClient {
private BufferedReader reader;
private BufferedWriter writer;
private Socket socket;
//发送消息给服务器
public void sendToServer(String msg) throws IOException {
//发送之前,判断socket的输出流是否关闭
if (!socket.isOutputShutdown()) {
//如果没有关闭就把用户键入的消息放到writer里面
writer.write(msg + "\n");
writer.flush();
}
}
//从服务器接收消息
public String receive() throws IOException {
String msg = null;
//判断socket的输入流是否关闭
if (!socket.isInputShutdown()) {
//没有关闭的话就可以通过reader读取服务器发送来的消息。注意:如果没有读取到消息线程会阻塞在这里
msg = reader.readLine();
}
return msg;
}

public void start() {
//和服务创建连接
try {
socket = new Socket("127.0.0.1", 8888);
reader=new BufferedReader(
new InputStreamReader(socket.getInputStream())
);
writer=new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);
//新建一个线程去监听用户输入的消息
new Thread(new UserInputHandler(this)).start();
/**
* 不停的读取服务器转发的其他客户端的信息
* 记录一下之前踩过的小坑:
* 这里一定要创建一个msg接收信息,如果直接用receive()方法判断和输出receive()的话会造成有的消息不会显示
* 因为receive()获取时,在返回之前是阻塞的,一旦接收到消息才会返回,也就是while这里是阻塞的,一旦有消息就会进入到while里面
* 这时候如果输出的是receive(),那么上次获取的信息就会丢失,然后阻塞在System.out.println
* */
String msg=null;
while ((msg=receive())!=null){
System.out.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if(writer!=null){
writer.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
new ChatClient().start();
}
}

 客户端UserInputHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class UserInputHandler implements Runnable {
private ChatClient client;

public UserInputHandler(ChatClient client) {
this.client = client;
}

@Override
public void run() {
try {
//接收用户输入的消息
BufferedReader reader = new BufferedReader(
new InputStreamReader(System.in)
);
//不停的获取reader中的System.in,实现了等待用户输入的效果
while (true) {
String input = reader.readLine();
//向服务器发送消息
client.sendToServer(input);
if (input.equals("quit"))
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

运行测试:

通过打开终端,通过javac编译。如果大家是在IDEA上编码的话可能会报编码错误,在javac后面加上-encoding utf-8再接java文件就好了。

编译后运行,通过java运行时,又遇到了一个坑。会报找不到主类的错误,原来是因为加上两个包,要在class文件名前面加上包名。比如当前在src目录,下面有client和server两个包,要这么运行:java client.XXXX。可我之前明明在client文件夹下运行的java,也是不行,不知道为什么。

接着测试:

1.首先在一个终端里运行ChatServer,打开服务器

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200411193234531-59946096.png

2.在第二个终端里打开ChatClient,暂且叫A,此时服务器的终端显示:

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200411193327017-1431424368.png

3.类似的,在第三个终端里打开ChatClient,暂且叫B,此时服务器显示:

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200411193358577-1779741867.png

4.A中输入hi,除了服务器会打印hi外,B中也会显示,图片中的端口号和前面的不一样,是因为中间出了点小问题,前三张截图和后面的不是同时运行的。实际中同一个客户端会显示一样的端口号:

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200411193747903-851197924.png

5.当客户端输入quit时就会断开连接,最后,服务器的显示为:

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200411193923381-1365643546.png

手动搭建I/O网络通信框架3:NIO编程模型,升级改造聊天室

转载自 https://www.cnblogs.com/lbhym/p/12698309.html

手动搭建I/O网络通信框架3:NIO编程模型,升级改造聊天室

在“手动搭建I/O网络通信框架2:BIO编程模型实现群聊”中用BIO编程模型,简单的实现了一个聊天室。但是其最大的问题在解释BIO时就已经说了:ServerSocket接收请求时(accept()方法)、InputStream、OutputStream(输入输出流的读和写)都是阻塞的。还有一个问题就是线程池,线程多了,服务器性能耗不起。线程少了,在聊天室这种场景下,让用户等待连接肯定不可取。今天要说到的NIO编程模型就很好的解决了这几个问题。有两个主要的替换地方:

  1. 用Channel代替Stream。
  2. 使用Selector监控多条Channel,起到类似线程池的作用,但是它只需一条线程。

既然要用NIO编程模型,那就要说说它的三个主要核心:Selector、Channel、Buffer。它们的关系是:一个Selector管理多个Channel,一个Channel可以往Buffer中写入和读取数据。Buffer名叫缓冲区,底层其实是一个数组,会提供一些方法往数组写入读取数据。

Buffer:

不太了解Buffer的可以看看这个:https://blog.csdn.net/czx2018/article/details/89502699

常用API:

  • allocate() - 初始化一块缓冲区
  • put() - 向缓冲区写入数据
  • get() - 向缓冲区读数据
  • filp() - 将缓冲区的读写模式转换
  • clear() - 这个并不是把缓冲区里的数据清除,而是利用后来写入的数据来覆盖原来写入的数据,以达到类似清除了老的数据的效果
  • compact() - 从读数据切换到写模式,数据不会被清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据
  • mark() - 对position做出标记,配合reset使用
  • reset() - 将position置为标记值

简单地说:Buffer实质上是个数组,有两个关键的指针,一个position代表当前数据写入到哪了、一个limit代表限制。初始化时设置了数组长度,这limit就是数组的长度。如:设置intBuffer.allocate(10),最大存储10个int数据,写入5五个数据后,需要读取数据了。用filp()转换读写模式后,limit=position,position=0。也就是说从0开始读,只能读到第五个。读完后这个缓冲区就需要clear()了,实际上并没有真的去清空数据,而是position和limit两个指针又回到了初始化的位置,接着又可以写入数据了,反正数组下标相同重新写入数据会覆盖,就没必要真的去清空了。

Channel:

Channel(通道)主要用于传输数据,然后从Buffer中写入或读取。它们两个结合起来虽然和流有些相似,但主要有以下几点区别:  

  1. 流是单向的,可以发现Stream的输入流和输出流是独立的,它们只能输入或输出。而通道既可以读也可以写。  
  2. 通道本身不能存放数据,只能借助Buffer。  
  3. Channel支持异步。  

Channel有如下三个常用的类:FileChannel、SocketChannel、ServerSocketChannel。从名字也可以看出区别,第一个是对文件数据的读写,后面两个则是针对Socket和ServerSocket,这里我们只是用后面两个。更详细的用法可以看:https://www.cnblogs.com/snailclimb/p/9086335.html,下面的代码中也会用到,会有详细的注释。

Selector

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200414150122707-1475260423.png

多个Channel可以注册到Selector,就可以直接通过一个Selector管理多个通道。Channel在不同的时间或者不同的事件下有不同的状态,Selector会通过轮询来达到监视的效果,如果查到Channel的状态正好是我们注册时声明的所要监视的状态,我们就可以查出这些通道,然后做相应的处理。这些状态如下:  1.客户端的SocketChannel和服务器端建立连接,SocketChannel状态就是Connect。  2.服务器端的ServerSocketChannel接收了客户端的请求,ServerSocketChannel状态就是Accept。  3.当SocketChannel有数据可读,那么它们的状态就是Read。  4.当我们需要向Channel中写数据时,那么它们的状态就是Write。  

具体的使用见下面代码注释或看 Java NIO之Selector(选择器)

NIO编程模型

NIO编程模型工作流程:  

  1. 首先会创建一个Selector,用来监视管理各个不同的Channel,也就是不同的客户端。相当于取代了原来BIO的线程池,但是它只需一个线程就可以处理多个Channel,没有了线程上下文切换带来的消耗,很好的优化了性能。  
  2. 创建一个ServerSocketChannel监听通信端口,并注册到Selector,让Seletor监视这个通道的Accept状态,也就是接收客户端请求的状态。
  3. 此时客户端ClientA请求服务器,那么Selector就知道了有客户端请求进来。这时候我们可以得到客户端的SocketChannel,并为这个通道注册Read状态,也就是Selector会监听ClientA发来的消息。  
  4. 一旦接收到ClientA的消息,就会用其他客户端的SocketChannel的Write状态,向它们转发ClientA的消息。

上代码之前,还是先说说各个类的作用:

相比较BIO的代码,NIO的代码还少了一个类,那就是服务器端的工作线程类。没了线程池,自然也不需要一个单独的线程去服务客户端。客户端还是需要一个单独的线程去等待用户输入,因为用户随时都可能输入信息,这个没法预见,只能阻塞式的等待。

  • ChatServer:服务器端的唯一的类,作用就是通过Selector监听Read和Accept事件,并针对这些事件的类型,进行不同的处理,如连接、转发。
  • ChatClient:客户端,通过Selector监听Read和Connect事件。Read事件就是获取服务器转发的消息然后显示出来;Connect事件就是和服务器建立连接,建立成功后就可以发送消息。
  • UserInputHandler:专门等待用户输入的线程,和BIO没区别。

ChatServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class ChatServer {
//设置缓冲区的大小,这里设置为1024个字节
private static final int BUFFER = 1024;

//Channel都要配合缓冲区进行读写,所以这里创建一个读缓冲区和一个写缓冲区
//allocate()静态方法就是设置缓存区大小的方法
private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);

//为了监听端口更灵活,再不写死了,用一个构造函数设置需要监听的端口号
private int port;

public ChatServer(int port) {
this.port = port;
}

private void start() {
//创建ServerSocketChannel和Selector并打开
try (ServerSocketChannel server = ServerSocketChannel.open(); Selector selector = Selector.open()) {
//【重点,实现NIO编程模型的关键】configureBlocking设置ServerSocketChannel为非阻塞式调用,Channel默认的是阻塞的调用方式
server.configureBlocking(false);
//绑定监听端口,这里不是给ServerSocketChannel绑定,而是给ServerSocket绑定,socket()就是获取通道原生的ServerSocket或Socket
server.socket().bind(new InetSocketAddress(port));

//把server注册到Selector并监听Accept事件
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("启动服务器,监听端口:" + port);

while (true) {
//select()会返回此时触发了多少个Selector监听的事件
if(selector.select()>0) {
//获取这些已经触发的事件,selectedKeys()返回的是触发事件的所有信息
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//循环处理这些事件
for (SelectionKey key : selectionKeys) {
handles(key, selector);
}
//处理完后清空selectedKeys,避免重复处理
selectionKeys.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

//处理事件的方法
private void handles(SelectionKey key, Selector selector) throws IOException {
//当触发了Accept事件,也就是有客户端请求进来
if (key.isAcceptable()) {
//获取ServerSocketChannel
ServerSocketChannel server = (ServerSocketChannel) key.channel();
//然后通过accept()方法接收客户端的请求,这个方法会返回客户端的SocketChannel,这一步和原生的ServerSocket类似
SocketChannel client = server.accept();
client.configureBlocking(false);

//把客户端的SocketChannel注册到Selector,并监听Read事件
client.register(selector, SelectionKey.OP_READ);
System.out.println("客户端[" + client.socket().getPort() + "]上线啦!");
}
//当触发了Read事件,也就是客户端发来了消息
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
//获取消息
String msg = receive(client);
System.out.println("客户端[" + client.socket().getPort() + "]:" + msg);
//把消息转发给其他客户端
sendMessage(client, msg, selector);
//判断用户是否退出
if (msg.equals("quit")) {
//解除该事件的监听
key.cancel();
//更新Selector
selector.wakeup();
System.out.println("客户端[" + client.socket().getPort() + "]下线了!");
}
}
}

//编码方式设置为utf-8,下面字符和字符串互转时用得到
private Charset charset = Charset.forName("UTF-8");

//接收消息的方法
private String receive(SocketChannel client) throws IOException {
//用缓冲区之前先清空一下,避免之前的信息残留
read_buffer.clear();
//把通道里的信息读取到缓冲区,用while循环一直读取,直到读完所有消息。因为没有明确的类似\n这样的结尾,所以要一直读到没有字节为止
while (client.read(read_buffer) > 0) ;
//把消息读取到缓冲区后,需要转换buffer的读写状态,不明白的看看前面的Buffer的讲解
read_buffer.flip();
return String.valueOf(charset.decode(read_buffer));
}

//转发消息的方法
private void sendMessage(SocketChannel client, String msg, Selector selector) throws IOException {
msg = "客户端[" + client.socket().getPort() + "]:" + msg;
//获取所有客户端,keys()与前面的selectedKeys不同,这个是获取所有已经注册的信息,而selectedKeys获取的是触发了的事件的信息
for (SelectionKey key : selector.keys()) {
//排除服务器和本客户端并且保证key是有效的,isValid()会判断Selector监听是否正常、对应的通道是保持连接的状态等
if (!(key.channel() instanceof ServerSocketChannel) && !client.equals(key.channel()) && key.isValid()) {
SocketChannel otherClient = (SocketChannel) key.channel();
write_buffer.clear();
write_buffer.put(charset.encode(msg));
write_buffer.flip();
//把消息写入到缓冲区后,再把缓冲区的内容写到客户端对应的通道中
while (write_buffer.hasRemaining()) {
otherClient.write(write_buffer);
}
}
}
}

public static void main(String[] args) {
new ChatServer(8888).start();
}
}

ChatClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class ChatClient {
private static final int BUFFER = 1024;
private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
//声明成全局变量是为了方便下面一些工具方法的调用,就不用try with resource了
private SocketChannel client;
private Selector selector;

private Charset charset = Charset.forName("UTF-8");

private void start() {
try {
client=SocketChannel.open();
selector=Selector.open();
client.configureBlocking(false);
//注册channel,并监听SocketChannel的Connect事件
client.register(selector, SelectionKey.OP_CONNECT);
//请求服务器建立连接
client.connect(new InetSocketAddress("127.0.0.1", 8888));
//和服务器一样,不停的获取触发事件,并做相应的处理
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
handle(key);
}
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
}catch (ClosedSelectorException e){
//当用户输入quit时,在send()方法中,selector会被关闭,而在上面的无限while循环中,可能会使用到已经关闭了的selector。
//所以这里捕捉一下异常,做正常退出处理就行了。不会对服务器造成影响
}
}

private void handle(SelectionKey key) throws IOException {
//当触发connect事件,也就是服务器和客户端建立连接
if (key.isConnectable()) {
SocketChannel client = (SocketChannel) key.channel();
//finishConnect()返回true,说明和服务器已经建立连接。如果是false,说明还在连接中,还没完全连接完成
if(client.finishConnect()){
//新建一个新线程去等待用户输入
new Thread(new UserInputHandler(this)).start();
}
//连接建立完成后,注册read事件,开始监听服务器转发的消息
client.register(selector,SelectionKey.OP_READ);
}
//当触发read事件,也就是获取到服务器的转发消息
if(key.isReadable()){
SocketChannel client = (SocketChannel) key.channel();
//获取消息
String msg = receive(client);
System.out.println(msg);
//判断用户是否退出
if (msg.equals("quit")) {
//解除该事件的监听
key.cancel();
//更新Selector
selector.wakeup();
}
}
}
//获取消息
private String receive(SocketChannel client) throws IOException{
read_buffer.clear();
while (client.read(read_buffer)>0);
read_buffer.flip();
return String.valueOf(charset.decode(read_buffer));
}

//发送消息
public void send(String msg) throws IOException{
if(!msg.isEmpty()){
write_buffer.clear();
write_buffer.put(charset.encode(msg));
write_buffer.flip();
while (write_buffer.hasRemaining()){
client.write(write_buffer);
}
if(msg.equals("quit")){
selector.close();
}
}
}

public static void main(String[] args) {
new ChatClient().start();
}
}

UserInputHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class UserInputHandler implements Runnable {
ChatClient client;
public UserInputHandler(ChatClient chatClient) {
this.client=chatClient;
}
@Override
public void run() {
BufferedReader read=new BufferedReader(
new InputStreamReader(System.in)
);
while (true){
try {
String input=read.readLine();
client.send(input);
if(input.equals("quit"))
break;
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

测试运行:之前用的是win10的终端运行的,以后直接用IDEA运行,方便些。不过一个类同时运行多个,以实现多个客户端的场景,需要先做以下设置

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200414152051636-927905062.png

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200414152100412-1057042302.png

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200414152130678-206345317.png

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200414152204091-2069873206.png

设置完后,就可以同时运行两个ChatClient了,上图中得Unnamed就是第二个ChatClient,选中后点击右边运行按钮就行了。效果如下:

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200414152412715-1986061599.png

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200414152438453-1030565064.png

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200414152450583-1807958534.png

手动搭建I/O网络通信框架4:AIO编程模型,聊天室终极改造

转载自 https://www.cnblogs.com/lbhym/p/12720944.html

前言

上一章讲到的NIO编程模型比较主流,非常著名的Netty就是基于NIO编程模型的。这一章说的是AIO编程模型,是异步非阻塞的。虽然同样实现的是聊天室功能,但是实现逻辑上稍微要比NIO和BIO复杂一点。不过理好整体脉络,会好理解一些。首先还是讲讲概念:

BIO和NIO的区别是阻塞和非阻塞,而AIO代表的是异步IO。在此之前只提到了阻塞和非阻塞,没有提到异步还是同步。可以用我在知乎上看到的一句话表示:【在处理 IO 的时候,阻塞和非阻塞都是同步 IO,只有使用了特殊的 API 才是异步 IO】。这些“特殊的API”下面会讲到。在说AIO之前,先总结一下阻塞非阻塞、异步同步的概念。

阻塞和非阻塞,描述的是结果的请求阻塞:在得到结果之前就一直呆在那,啥也不干,此时线程挂起,就如其名,线程被阻塞了。非阻塞:如果没得到结果就返回,等一会再去请求,直到得到结果为止。异步和同步,描述的是结果的发出,当调用方的请求进来。同步:在没获取到结果前就不返回给调用方,如果调用方是阻塞的,那么调用方就会一直等着。如果调用方是非阻塞的,调用方就会先回去,等一会再来问问得到结果没。异步:调用方一来,会直接返回,等执行完实际的逻辑后在通过回调函数把结果返回给调用方。

AIO中的异步操作

CompletionHandler

在AIO编程模型中,常用的API,如connect、accept、read、write都是支持异步操作的。当调用这些方法时,可以携带一个CompletionHandler参数,它会提供一些回调函数。这些回调函数包括:

  1. 当这些操作成功时你需要怎么做;
  2. 如果这些操作失败了你要这么做。

关于这个CompletionHandler参数,你只需要写一个类实现CompletionHandler口,并实现里面两个方法就行了。

那如何在调用connect、accept、read、write这四个方法时,传入CompletionHandler参数从而实现异步呢?下面分别举例这四个方法的使用。

先说说SocketServerSocket,在NIO中,它们变成了通道,配合缓冲区,从而实现了非阻塞。而在AIO中它们变成了异步通道。也就是AsynchronousServerSocketChannelAsynchronousSocketChannel,下面例子中对象名分别是serverSocket和socket.

  • accept:serverSocket.accept(attachment,handler)。

    handler就是实现了CompletionHandler接口并实现两个回调函数的类,它具体怎么写可以看下面的实战代码。attachment为handler里面可能需要用到的辅助数据,如果没有就填null。

  • read:socket.read(buffer,attachment,handler)。

    buffer是缓冲区,用以存放读取到的信息。后面两个参数和accept一样。

  • write:socket.write(buffer,attachment,handler)。

    和read参数一样。

  • connect:socket.connect(address,attachment,handler)。

    address为服务器的IP和端口,后面两个参数与前几个一样。

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200417161130725-1186191947.png

Future

既然说到了异步操作,除了使用实现CompletionHandler接口的方式,不得不想到Future。客户端逻辑较为简单,如果使用CompletionHandler的话代码反而更复杂,所以下面的实战客户端代码就会使用Future的方式。简单来说,Future表示的是异步操作未来的结果,怎么理解未来。比如,客户端调用read方法获取服务器发来得消息:

1
Future<Integer> readResult=clientChannel.read(buffer)

Integer是read()的返回类型,此时变量readResult实际上并不一定有数据,而是表示read()方法未来的结果,这时候readResult有两个方法,isDone():返回boolean,查看程序是否完成处理,如果返回true,有结果了,这时候可以通过get()获取结果。如果你不事先判断isDone()直接调用get()也行,只不过它是阻塞的。如果你不想阻塞,想在这期间做点什么,就用isDone()。

还有一个问题:这些handler的方法是在哪个线程执行的?serverSocket.accept这个方法肯定是在主线程里面调用的,而传入的这些回调方法其实是在其他线程执行的。在AIO中,会有一个AsynchronousChannelGroup,它和AsynchronousServerSocketChannel是绑定在一起的,它会为这些异步通道提供系统资源,线程就算其中一种系统资源,所以为了方便理解,我们暂时可以把他看作一个线程池,它会为这些handler分配线程,而不是在主线程中去执行。

AIO编程模型

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200417161442138-580814692.png

上面只说了些零碎的概念,为了更好的理解,下面讲一讲大概的工作流程(主要针对服务器,客户端逻辑较为简单,代码注释也比较少,可以看前面几章):

  1. 首先做准备工作。跟NIO一样,先要创建好通道,只不过AIO是异步通道。然后创建好AsyncChannelGroup,可以选择自定义线程池。最后把AsyncServerSocket和AsyncChannelGroup绑定在一起,这样处于同一个AsyncChannelGroup里的通道就可以共享系统资源。
  2. 最后一步准备工作,创建好handler类,并实现接口和里面两个回调方法。(如图:客户端1对应的handler,里面的回调方法会实现读取消息和转发消息的功能;serverSocket的handler里的回调方法会实现accept功能。)
  3. 准备工作完成,当客户端1连接请求进来,客户端会马上回去,ServerSocket的异步方法会在连接成功后把客户端的SocketChannel存进在线用户列表,并利用客户端1的handler开始异步监听客户端1发送的消息。
  4. 当客户端1发送消息时,如果上一步中的handler成功监听到,就会回调成功后的回调方法,这个方法里会把这个消息转发给其他客户端。转发完成后,接着利用handler监听客户端1发送的消息。

代码一共有三个类:

  • ChatServer:功能基本上和上面讲的工作流程差不多,还会有一些工具方法,都比较简单,就不多说了,如:转发消息,客户端下线后从在线列表移除客户端等。
  • ChatClient:基本和前两章的BIO、NIO没什么区别,一个线程监听用户输入信息并发送,主线程异步的读取服务器信息。
  • UserInputHandler:监听用户输入信息的线程。

ChatServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
public class ChatServer {
//设置缓冲区字节大小
private static final int BUFFER = 1024;

//声明AsynchronousServerSocketChannel和AsynchronousChannelGroup
private AsynchronousServerSocketChannel serverSocketChannel;
private AsynchronousChannelGroup channelGroup;

//在线用户列表。为了并发下的线程安全,所以使用CopyOnWriteArrayList
//CopyOnWriteArrayList在写时加锁,读时不加锁,而本项目正好在转发消息时需要频繁读取.
//ClientHandler包含每个客户端的通道,类型选择为ClientHandler是为了在write的时候调用每个客户端的handler
private CopyOnWriteArrayList<ClientHandler> clientHandlerList;
//字符和字符串互转需要用到,规定编码方式,避免中文乱码
private Charset charset = Charset.forName("UTF-8");

//通过构造函数设置监听端口
private int port;
public ChatServer(int port) {
this.port = port;
clientHandlerList=new CopyOnWriteArrayList<>();
}

public void start() {
try {
/**
*创建一个线程池并把线程池和AsynchronousChannelGroup绑定,前面提到了AsynchronousChannelGroup包括一些系统资源,而线程就是其中一种。
*为了方便理解我们就暂且把它当作线程池,实际上并不止包含线程池。如果你需要自己选定线程池类型和数量,就可以如下操作
*如果不需要自定义线程池类型和数量,可以不用写下面两行代码。
* */
ExecutorService executorService = Executors.newFixedThreadPool(10);
channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
serverSocketChannel=AsynchronousServerSocketChannel.open(channelGroup);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1",port));
System.out.println("服务器启动:端口【"+port+"】");
/**
* AIO中accept可以异步调用,就用上面说到的CompletionHandler方式
* 第一个参数是辅助参数,回调函数中可能会用上的,如果没有就填null;第二个参数为CompletionHandler接口的实现
* 这里使用while和System.in.read()的原因:
* while是为了让服务器保持运行状态,前面的NIO,BIO都有用到while无限循环来保持服务器运行,但是它们用的地方可能更好理解
* System.in.read()是阻塞式的调用,只是单纯的避免无限循环而让accept频繁被调用,无实际业务功能。
*/
while (true) {
serverSocketChannel.accept(null, new AcceptHandler());
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if(serverSocketChannel!=null){
try {
serverSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

//AsynchronousSocketChannel为accept返回的类型,Object为辅助参数类型,没有就填Object
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{
//如果成功,执行的回调方法
@Override
public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
//如果服务器没关闭,在接收完当前客户端的请求后,再次调用,以接着接收其他客户端的请求
if(serverSocketChannel.isOpen()){
serverSocketChannel.accept(null,this);
}
//如果客户端的channel没有关闭
if(clientChannel!=null&&clientChannel.isOpen()){
//这个就是异步read和write要用到的handler,并传入当前客户端的channel
ClientHandler handler=new ClientHandler(clientChannel);
//把新用户添加到在线用户列表里
clientHandlerList.add(handler);
System.out.println(getPort(clientChannel)+"上线啦!");
ByteBuffer buffer=ByteBuffer.allocate(BUFFER);
//异步调用read,第一个buffer是存放读到数据的容器,第二个是辅助参数。
//因为真正的处理是在handler里的回调函数进行的,辅助参数会直接传进回调函数,所以为了方便使用,buffer就当作辅助参数
clientChannel.read(buffer,buffer,handler);
}
}
//如果失败,执行的回调方法
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接失败"+exc);
}
}

private class ClientHandler implements CompletionHandler<Integer, ByteBuffer>{
private AsynchronousSocketChannel clientChannel;
public ClientHandler(AsynchronousSocketChannel clientChannel) {
this.clientChannel = clientChannel;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
if(buffer!=null){
//如果read返回的结果小于等于0,而buffer不为空,说明客户端通道出现异常,做下线操作
if(result<=0){
removeClient(this);
}else {
//转换buffer读写模式并获取消息
buffer.flip();
String msg=String.valueOf(charset.decode(buffer));
//在服务器上打印客户端发来的消息
System.out.println(getPort(clientChannel)+msg);
//把消息转发给其他客户端
sendMessage(clientChannel,getPort(clientChannel)+msg);
buffer=ByteBuffer.allocate(BUFFER);

//如果用户输入的是退出,就从在线列表里移除。否则接着监听这个用户发送消息
if(msg.equals("quit"))
removeClient(this);
else
clientChannel.read(buffer, buffer, this);
}
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("客户端读写异常:"+exc);
}
}

//转发消息的方法
private void sendMessage(AsynchronousSocketChannel clientChannel,String msg){
for(ClientHandler handler:clientHandlerList){
if(!handler.clientChannel.equals(clientChannel)){
ByteBuffer buffer=charset.encode(msg);
//write不需要buffer当辅助参数,因为写到客户端的通道就完事了,而读还需要回调函数转发给其他客户端。
handler.clientChannel.write(buffer,null,handler);
}
}
}
//根据客户端channel获取对应端口号的方法
private String getPort(AsynchronousSocketChannel clientChannel){
try {
InetSocketAddress address=(InetSocketAddress)clientChannel.getRemoteAddress();
return "客户端["+address.getPort()+"]:";
} catch (IOException e) {
e.printStackTrace();
return "客户端[Undefined]:";
}
}
//移除客户端
private void removeClient(ClientHandler handler){
clientHandlerList.remove(handler);
System.out.println(getPort(handler.clientChannel)+"断开连接...");
if(handler.clientChannel!=null){
try {
handler.clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
new ChatServer(8888).start();
}
}

ChatClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ChatClient {
private static final int BUFFER = 1024;
private AsynchronousSocketChannel clientChannel;
private Charset charset = Charset.forName("UTF-8");

private String host;
private int port;
//设置服务器IP和端口
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}

public void start() {
try {
clientChannel = AsynchronousSocketChannel.open();
//连接服务器
Future<Void> future = clientChannel.connect(new InetSocketAddress(host, port));
future.get();
//新建一个线程去等待用户输入
new Thread(new UserInputHandler(this)).start();
ByteBuffer buffer=ByteBuffer.allocate(BUFFER);
//无限循环让客户端保持运行状态
while (true){
//获取服务器发来的消息并存入到buffer
Future<Integer> read=clientChannel.read(buffer);
if(read.get()>0){
buffer.flip();
String msg=String.valueOf(charset.decode(buffer));
System.out.println(msg);
buffer.clear();
}else {
//如果read的结果小于等于0说明和服务器连接出现异常
System.out.println("服务器断开连接");
if(clientChannel!=null){
clientChannel.close();
}
System.exit(-1);
}
}
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

public void send(String msg) {
if (msg.isEmpty())
return;
ByteBuffer buffer = charset.encode(msg);
Future<Integer> write=clientChannel.write(buffer);
try {
//获取发送结果,如果get方法发生异常说明发送失败
write.get();
} catch (ExecutionException|InterruptedException e) {
System.out.println("消息发送失败");
e.printStackTrace();
}
}

public static void main(String[] args) {
new ChatClient("127.0.0.1",8888).start();
}
}

UserInputHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class UserInputHandler implements Runnable {
ChatClient client;
public UserInputHandler(ChatClient chatClient) {
this.client=chatClient;
}
@Override
public void run() {
BufferedReader read=new BufferedReader(
new InputStreamReader(System.in)
);
while (true){
try {
String input=read.readLine();
client.send(input);
if(input.equals("quit"))
break;
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

运行测试:

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200417164605075-508533365.png

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200417164620441-2122640649.png

https://img2020.cnblogs.com/blog/1383122/202004/1383122-20200417164642458-1627643597.png

在用netty作为底层网络通信的时候关于ChannelOption的参数让我一直模糊不清楚,于是去看一下linux网络编程,发现ChannelOption的各种属性在套接字选项中都有对应

下面简单的总结一下ChannelOption的含义已及使用的场景

阅读全文 »

背景

在当今这个时代,人们对互联网的依赖程度非常高,也因此产生了大量的数据,企业视这些数据为瑰宝。而这些被视为瑰宝的数据为我们的系统带来了很大的烦恼。这些海量数据的存储与访问成为了系统设计与使用的瓶颈,而这些数据往往存储在数据库中,传统的数据库存在着先天的不足,即单机(单库)性能瓶颈,并且扩展起来非常的困难。在当今的这个大数据时代,我们急需解决这个问题。如果单机数据库易于扩展,数据可切分,就可以避免这些问题,但是当前的这些数据库厂商,包括开源的数据库MySQL在内,提供这些服务都是需要收费的,所以我们转向一些第三方的软件,使用这些软件做数据的切分,将原本在一台数据库上的数据,分散到多台数据库当中,降低每一个单体数据库的负载。那么我们如何做数据切分呢?

阅读全文 »