0%

Rabbitmq高级特性-生产端特性讲解_确认机制和返回机制(20)

[TOC]

rabbitmq_20_1

rabbitmq_20_2

rabbitmq_20_3

rabbitmq_20_4

rabbitmq_20_5

rabbitmq_20_6

rabbitmq_20_7

Sender4ConfirmListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Sender4ConfirmListener {

public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();

//4 声明
String exchangeName = "test_confirmlistener_exchange";
String routingKey1 = "confirm.save";

//5 发送
String msg = "Hello World RabbitMQ 4 Confirm Listener Message ...";

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("------- error ---------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("------- ok ---------");
}
});

channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());


}

}

Receiver4ConfirmListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Receiver4ConfirmListener {

public static void main(String[] args) throws Exception {


ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_confirmlistener_exchange";
String exchangeType = "topic";
String queueName = "test_confirmlistener_queue";
//String routingKey = "user.*";
String routingKey = "confirm.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
//手工签收消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

Sender4ReturnListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class Sender4ReturnListener {

public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();

//4 声明
String exchangeName = "test_returnlistener_exchange";
String routingKey1 = "abcd.save";
String routingKey2 = "return.save";
String routingKey3 = "return.delete.abc";

//5 监听
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("**************handleReturn**********");
System.out.println("replyCode: " + replyCode);
System.out.println("replyText: " + replyText);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("body: " + new String(body));
}
});

//6 发送
String msg = "Hello World RabbitMQ 4 Return Listener Message ...";

boolean mandatory = true;
channel.basicPublish(exchangeName, routingKey1 , mandatory, null , msg.getBytes());
// channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
/// channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());


}

}

Receiver4ReturnListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Receiver4ReturnListener {

public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_returnlistener_exchange";
String exchangeType = "topic";
String queueName = "test_returnlistener_queue";
//String routingKey = "user.*";
String routingKey = "return.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}