消息生产rabbitmq-provider
pom.xml添加
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml 添加
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 192.168.3.137
port: 5672
username: root
password: root
#虚拟host 可以不设置,使用server默认host
virtual-host: /
#消息确认配置项
#确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
Service代码
@Slf4j
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("**ConfirmCallback: "+"相关数据:"+correlationData);
log.info("**ConfirmCallback: "+"确认情况:"+ack);
log.info("**ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage message) {
log.info("^^ReturnCallback: "+"消息:"+message);
}
});
return rabbitTemplate;
}
}
/**
* 直连型交换机 - 生产者
*/
@Slf4j
@Configuration
public class AlertDirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue AlertDirectQueue() {
log.info("Alert直连型队列AlertDirectQueue");
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("AlertDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("AlertDirectQueue",true);
}
//Direct交换机 起名:AlertDirectExchange
@Bean
DirectExchange AlertDirectExchange() {
log.info("Alert直连型交换机AlertDirectExchange");
// return new DirectExchange("AlertDirectExchange",true,true);
return new DirectExchange("AlertDirectExchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:AlertDirectRouting
@Bean
Binding bindingDirect() {
log.info("Alert直连型队列绑定到上交换机,主键为AlertDirectRouting");
return BindingBuilder.bind(AlertDirectQueue()).to(AlertDirectExchange()).with("AlertDirectRouting");
}
}
Controller 代码
@RestController
@Slf4j
public class SendAlertController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendAlertMessage")
public String sendAlertMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "alert message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//将消息携带绑定键值:AlertDirectRouting 发送到交换机AlertDirectExchange
rabbitTemplate.convertAndSend("AlertDirectExchange", "AlertDirectRouting", map);
return "ok";
}
}
消费消息rabbitmq-consumer
pom.xml添加
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml 添加
spring:
#给项目来个名字
application:
name: rabbitmq-consumer
#配置rabbitMq 服务器
rabbitmq:
host: 192.168.3.137
port: 5672
username: root
password: root
#虚拟host 可以不设置,使用server默认host
virtual-host: /
#消息确认配置项
#确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
Service代码
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private AlertAckReceiver alertAckReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置一个队列
container.setQueueNames("AlertDirectQueue");
//如果同时设置多个如下: 前提是队列都是必须已经创建存在的
// container.setQueueNames("TestDirectQueue","fanout.A");
//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
//container.setQueues(new Queue("TestDirectQueue",true));
//container.addQueues(new Queue("TestDirectQueue2",true));
//container.addQueues(new Queue("TestDirectQueue3",true));
container.setMessageListener(alertAckReceiver);
return container;
}
}
@Slf4j
@Component
public class AlertAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理
String msg = message.toString();
//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据
String[] msgArray = msg.split("'");
Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(),3);
String messageId=msgMap.get("messageId");
String messageData=msgMap.get("messageData");
String createTime=msgMap.get("createTime");
log.info(" AlertAckReceiver messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
log.info("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());
//第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(deliveryTag, true);
//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
// channel.basicReject(deliveryTag, true);
if ("AlertDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){
log.info("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
log.info("消息成功消费到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
log.info("执行AlertDirectQueue中的消息的业务处理流程......");
}
if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){
log.info("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
log.info("消息成功消费到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
log.info("执行fanout.A中的消息的业务处理流程......");
}
log.info("..................................................................");
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
private Map<String, String> mapStringToMap(String str, int entryNum) {
log.info(str);
str = str.substring(1, str.length() - 1);
String[] strs = str.split(",",entryNum);
Map<String, String> map = new HashMap<String, String>();
for (String string : strs) {
String key = string.split("=")[0].trim();
String value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}