0%

[TOC]

rabbitmq_20_1

rabbitmq_20_2

rabbitmq_20_3

rabbitmq_20_4

rabbitmq_20_5

rabbitmq_20_6

rabbitmq_20_7

Sender4ConfirmListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Sender4ConfirmListener {

public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();

//4 声明
String exchangeName = "test_confirmlistener_exchange";
String routingKey1 = "confirm.save";

//5 发送
String msg = "Hello World RabbitMQ 4 Confirm Listener Message ...";

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("------- error ---------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("------- ok ---------");
}
});

channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());


}

}

Receiver4ConfirmListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Receiver4ConfirmListener {

public static void main(String[] args) throws Exception {


ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_confirmlistener_exchange";
String exchangeType = "topic";
String queueName = "test_confirmlistener_queue";
//String routingKey = "user.*";
String routingKey = "confirm.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
//手工签收消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

Sender4ReturnListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class Sender4ReturnListener {

public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();

//4 声明
String exchangeName = "test_returnlistener_exchange";
String routingKey1 = "abcd.save";
String routingKey2 = "return.save";
String routingKey3 = "return.delete.abc";

//5 监听
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("**************handleReturn**********");
System.out.println("replyCode: " + replyCode);
System.out.println("replyText: " + replyText);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("body: " + new String(body));
}
});

//6 发送
String msg = "Hello World RabbitMQ 4 Return Listener Message ...";

boolean mandatory = true;
channel.basicPublish(exchangeName, routingKey1 , mandatory, null , msg.getBytes());
// channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
/// channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());


}

}

Receiver4ReturnListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Receiver4ReturnListener {

public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_returnlistener_exchange";
String exchangeType = "topic";
String queueName = "test_returnlistener_queue";
//String routingKey = "user.*";
String routingKey = "return.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_17_1.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_17_2.png

Sender4FanoutExchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Sender4FanoutExchange {

public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_fanout_exchange";
//5 发送
for(int i = 0; i < 10; i ++) {
String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, "11" , null , msg.getBytes());
}
channel.close();
connection.close();
}

}

Receiver4FanoutExchange1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Receiver4FanoutExchange1 {

public static void main(String[] args) throws Exception {


ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; // 不设置路由键
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("--------------- consumer 1 --------------");
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}

Receiver4FanoutExchange2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Receiver4FanoutExchange2 {

public static void main(String[] args) throws Exception {


ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; // 不设置路由键
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("--------------- consumer 2 --------------");
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_16_1.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_16_2.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_16_3.png

Sender4TopicExchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Sender4TopicExchange {

public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 发送

String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}

}

Receiver4TopicExchange1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Receiver4TopicExchange1 {

public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
//String routingKey = "user.*";
String routingKey = "user.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("consumer1 start.. ");
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
}
}
}

Receiver4TopicExchange2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Receiver4TopicExchange2 {

public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.*";
// String routingKey = "user.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("consumer2 start .. ");
// 循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
}
}
}

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_14_3.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_14_5.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_14_6.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_14_7.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_14_8.png

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Sender4DirectExchange {

public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_direct_exchange";
String routingKey = "test_direct_routingKey";
//5 发送

String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());

}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Receiver4DirectExchange {

public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test_direct_routingKey";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_14_1.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_14_2.png

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Sender {


public static void main(String[] args) throws Exception {

// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2 创建Connection
Connection connection = connectionFactory.newConnection();
// 3 创建Channel
Channel channel = connection.createChannel();
// 4 声明
String queueName = "test001";
// 参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除, 其他参数
channel.queueDeclare(queueName, false, false, false, null);

Map<String, Object> headers = new HashMap<String, Object>();

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();


for(int i = 0; i < 5;i++) {
String msg = "Hello World RabbitMQ " + i;
channel.basicPublish("", queueName , props , msg.getBytes());
}
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Receiver {

public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

String queueName = "test001";
// durable 是否持久化消息
channel.queueDeclare(queueName, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_13_1.png

RabbitMQ急速入门

急速入门,在这里我们使用RabbitMQ 3.6.5 版本进行操作:

1. 首先在Linux上进行一些软件的准备工作,yum下来一些基础的软件包

1
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
    • 配置好主机名称**:/etc/hosts /etc/hostname

2. 下载RabbitMQ所需软件包(我在在这里使用的是 RabbitMQ3.6.5 稳定版本)

1
2
3
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget <http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-1.1.el7.lux.x86_64.rpm>
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

3. 安装服务命令

1
2
3
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm 
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

4. 修改用户登录与连接心跳检测,注意修改

1
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

修改点1:loopback_users 中的 <<”guest”>>,只保留guest (用于用户登录) 修改点2:heartbeat 为10(用于心跳连接)

5. 安装管理插件

5.1 首先启动服务(后面 | 包含了停止、查看状态以及重启的命令)

1
/etc/init.d/rabbitmq-server start | stop | status | restart

5.2 查看服务有没有启动: lsof -i:5672 (5672是Rabbit的默认端口)

1
rabbitmq-plugins enable rabbitmq_management

5.3 可查看管理端口有没有启动:

1
lsof -i:15672 或者 netstat -tnlp | grep 15672

6. 一切OK 我们访问地址,输入用户名密码均为 guest :

http://你的ip地址:15672/

7. 如果一切顺利,那么到此为止,我们的环境已经安装完啦

1 前言

RabbitMQ(一):RabbitMQ快速入门

RabbitMQ基础概念详细介绍

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_1.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_2.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_3.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_4.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_5.png

2 AMQP协议中间的几个重要概念:

下图是AMQP的协议模型:

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_6.png

正如图中所看到的,AMQP协议模型有三部分组成:生产者、消费者和服务端。

生产者是投递消息的一方,首先连接到Server,建立一个连接,开启一个信道;然后生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。同理,消费者也需要进行建立连接,开启信道等操作,便于接收消息。

接着生产者就可以发送消息,发送到服务端中的虚拟主机,虚拟主机中的交换器根据路由键选择路由规则,然后发送到不同的消息队列中,这样订阅了消息队列的消费者就可以获取到消息,进行消费。

最后还要关闭信道和连接。

  • Server:又称Broker。接收客户端的连接,实现AMQP实体服务。
  • Connection:连接,应用程序与Server的网络连接,TCP连接。
  • Channel:信道,消息读写(几乎所有操作)等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
  • Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
  • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
  • Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
  • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
  • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
  • Queue:消息队列,用来保存消息,供消费者消费。

我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢?

试想这样一个场景, 一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用 TCP 连接复用的方式,不仅可以减少性能开销,同时也便于管理 。

RabbitMQ是基于AMQP协议实现的,其结构如下图所示,和AMQP协议简直就是一模一样。

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_7.png

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_11.png

3 常用交换器

RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种。

3.1 Direct Exchange

该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_8.png

3.2 Topic Exchange

该类型的交换器将所有发送到Topic Exchange的消息被转发到所有RoutingKey中指定的Topic的队列上面。

Exchange将RoutingKey和某Topic进行模糊匹配,其中“”用来匹配一个词,“#”用于匹配一个或者多个词。例如“com.#”能匹配到“com.rabbitmq.oa”和“com.rabbitmq”;而”login.“只能匹配到“com.rabbitmq”。

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_9.png

3.3 Fanout Exchange

该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好。

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/rabbitmq_11_10.png

3.4 Headers Exchange

该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用。