1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| import java.util.Map; import java.util.UUID;
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;
@Component public class RabbitSender {
@Autowired private RabbitTemplate rabbitTemplate;
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println("消息ACK结果:" + ack + ", correlationData: " + correlationData.getId()); } };
public void send(Object message, Map<String, Object> properties) throws Exception { MessageHeaders mhs = new MessageHeaders(properties); Message<?> msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); MessagePostProcessor mpp = new MessagePostProcessor() { @Override public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException { System.err.println("---> post to do: " + message); return message; } }; rabbitTemplate.convertAndSend("exchange-1", "springboot.rabbit", msg, mpp, correlationData); } }
|