<!--添加AMQP的启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
server:
port: 8080
spring:
application:
name: mq-rabbitmq-producer
#rabbitmq配置
rabbitmq:
host: localhost
port: 5672
#注意:guest用户只能链接本地服务器 比如localhost 不可以连接远程服务器
username: guest
password: guest
#虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
virtual-host: /
#支持发布返回
publisher-returns: true
listener:
# Routing 路由模型(交换机类型:direct)
direct:
#消息确认:手动签收
acknowledge-mode: manual
#当前监听容器数
concurrency: 1
#最大数
max-concurrency: 10
#是否支持重试
retry:
enabled: true
#重试次数5,超过5次抛出异常
max-attempts: 5
#重试间隔 3s
max-interval: 3000
package com.example.mqrabbitmqproducer.util.rabbitmq;
/**
* RabbitMQ RoutingKey 常量工具类
* @author qzz
*/
public class RabbitMQConstantUtil {
/**
* 交换机名称
*/
public static final String DIRECT_EXCHANGE = "directExchange";
/**
* 取消订单 队列名称 routingkey
*/
public static final String CANCEL_ORDER = "cancel-order";
/**
* 自动确认订单 队列名称\routingkey
*/
public static final String CONFIRM_ORDER = "confirm-order";
}
package com.example.mqrabbitmqproducer.util.rabbitmq.config;
import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
* @author qzz
*/
@Configuration
@EnableRabbit
public class RabbitMQConfig {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
/**
* 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式,
* 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类
*/
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//开启手动ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public AmqpTemplate amqpTemplate(){
rabbitTemplate.setEncoding("UTF-8");
rabbitTemplate.setMandatory(true);
/**
* ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
* config : 需要开启rabbitmq发送失败回退
* yml配置publisher-returns: true
* 或rabbitTemplate.setMandatory(true);设置为true
*/
rabbitTemplate.setReturnsCallback(returnedMessage -> {
String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
byte[] message = returnedMessage.getMessage().getBody();
Integer replyCode = returnedMessage.getReplyCode();
String replyText = returnedMessage.getReplyText();
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
new String(message),messageId,replyCode,replyText,exchange,routingKey);
});
return rabbitTemplate;
}
/**
* 声明直连交换机 支持持久化
* @return
*/
@Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE)
public Exchange directExchange(){
return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build();
}
/**
* 取消订单 消息队列
* @return
*/
@Bean(RabbitMQConstantUtil.CANCEL_ORDER)
public Queue cancelOrderQueue(){
return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true);
}
/**
* 把取消订单消息队列绑定到交换机上
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue,
@Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
//RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs();
}
/**
* 自动确认订单 消息队列
* @return
*/
@Bean(RabbitMQConstantUtil.CONFIRM_ORDER)
public Queue confirmOrderQueue(){
return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true);
}
/**
* 把自动确认订单消息队列绑定到交换机上
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue,
@Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
//RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs();
}
}
package com.example.mqrabbitmqproducer.util.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* Routing 路由模型(交换机类型:direct)
* 消息生成者
* @author qzz
*/
@Component
public class DirectSender {
private static final Logger log = LoggerFactory.getLogger(DirectSender.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
* @param routingKey
* @param msg
*/
public void send (String routingKey,String msg){
Message message = MessageBuilder.withBody(msg.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID()+"").build();
log.info("【发送者】消息内容【{}】 交换机【{}】 路由【{}】 消息ID【{}】",msg,RabbitMQConstantUtil.DIRECT_EXCHANGE
,routingKey,message.getMessageProperties().getMessageId());
rabbitTemplate.convertAndSend(RabbitMQConstantUtil.DIRECT_EXCHANGE,routingKey,message);
}
}
package com.example.mqrabbitmqproducer.controller;
import com.alibaba.fastjson.JSONObject;
import com.example.mqrabbitmqproducer.util.rabbitmq.DirectSender;
import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* 模拟测试消息发送
* @author qzz
*/
@RestController
@RequestMapping("/order")
public class TestRabbitMQSendMsg {
/**
* rabbitMQ消息发送
*/
@Autowired
private DirectSender directSender;
/**
* 测试取消订单,发送消息
*/
@GetMapping("/cancel")
public void cancel(){
//取消订单逻辑省略
//取消订单,发送消息
Map<String, Object> map = new HashMap<>();
map.put("order_number","4364756867987989");
map.put("product_id","1");
directSender.send(RabbitMQConstantUtil.CANCEL_ORDER, JSONObject.toJSONString(map));
}
/**
* 测试自动确认订单,发送消息
*/
@GetMapping("/confirm")
public void confirm(){
//自动确认订单,发送消息
String order_number="4364756867987989";
directSender.send(RabbitMQConstantUtil.CONFIRM_ORDER, order_number);
}
}
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--添加AMQP的启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置application.yml
server:
port: 8083
spring:
application:
name: mq-rabbitmq-consumer
#rabbitmq配置
rabbitmq:
host: localhost
port: 5672
#注意:guest用户只能链接本地服务器 比如localhost 不可以连接远程服务器
username: guest
password: guest
#虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
virtual-host: /
配置RabbitMQ常量类,配置直连交换机名称、消息队列名称、routingkey
package com.example.mqrabbitmqconsumer.util.rabbitmq;
/**
* RabbitMQ RoutingKey 常量工具类
* @author qzz
*/
public class RabbitMQConstantUtil {
/**
* 交换机名称
*/
public static final String DIRECT_EXCHANGE = "directExchange";
/**
* 取消订单 队列名称 \routingkey
*/
public static final String CANCEL_ORDER = "cancel-order";
/**
* 自动确认订单 队列名称\routingkey
*/
public static final String CONFIRM_ORDER = "confirm-order";
}
创建RabbitMQConfig配置类,rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
package com.example.mqrabbitmqconsumer.util.rabbitmq.config;
import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
* @author qzz
*/
@Configuration
@EnableRabbit
public class RabbitMQConfig {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
/**
* 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式,
* 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类
*/
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//开启手动ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public AmqpTemplate amqpTemplate(){
rabbitTemplate.setEncoding("UTF-8");
rabbitTemplate.setMandatory(true);
/**
* ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
* config : 需要开启rabbitmq发送失败回退
* yml配置publisher-returns: true
* 或rabbitTemplate.setMandatory(true);设置为true
*/
rabbitTemplate.setReturnsCallback(returnedMessage -> {
String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
byte[] message = returnedMessage.getMessage().getBody();
Integer replyCode = returnedMessage.getReplyCode();
String replyText = returnedMessage.getReplyText();
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
new String(message),messageId,replyCode,replyText,exchange,routingKey);
});
return rabbitTemplate;
}
/**
* 声明直连交换机 支持持久化
* @return
*/
@Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE)
public Exchange directExchange(){
return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build();
}
/**
* 取消订单 消息队列
* @return
*/
@Bean(RabbitMQConstantUtil.CANCEL_ORDER)
public Queue cancelOrderQueue(){
return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true);
}
/**
* 把取消订单消息队列绑定到交换机上
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue,
@Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
//RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs();
}
/**
* 自动确认订单 消息队列
* @return
*/
@Bean(RabbitMQConstantUtil.CONFIRM_ORDER)
public Queue confirmOrderQueue(){
return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true);
}
/**
* 把自动确认订单消息队列绑定到交换机上
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue,
@Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
//RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs();
}
}
创建消费者消息监听,监听取消订单
package com.example.mqrabbitmqconsumer.listener;
import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 监听取消订单
* @author qzz
*/
@Component
public class RabbitMQCancelOrderListener {
private static final Logger log = LoggerFactory.getLogger(RabbitMQCancelOrderListener.class);
/**
* 接受消息
* @param channel
* @param message
* @throws Exception
*/
@RabbitHandler
@RabbitListener(queues = RabbitMQConstantUtil.CANCEL_ORDER)
public void receiverMsg(Channel channel, Message message) throws Exception {
//body 即消息体
String msg = new String(message.getBody());
String messageId = message.getMessageProperties().getMessageId();
log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId);
try{
//如果有业务逻辑,则在这里编写
//告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("消息处理出现异常:{}",e.getMessage());
//告诉消息服务器 消息处理异常,消息需要重新再次发送!
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
}
}
}
监听自动确认订单
package com.example.mqrabbitmqconsumer.listener;
import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 监听自动确认订单
* @author qzz
*/
@Component
public class RabbitMQConfirmOrderListener {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfirmOrderListener.class);
/**
* 接受消息
* @param channel
* @param message
* @throws Exception
*/
@RabbitHandler
@RabbitListener(queues = RabbitMQConstantUtil.CONFIRM_ORDER)
public void receiverMsg(Channel channel, Message message) throws Exception {
//body 即消息体
String msg = new String(message.getBody());
String messageId = message.getMessageProperties().getMessageId();
log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId);
try{
//如果有业务逻辑,则在这里编写
//告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("消息处理出现异常:{}",e.getMessage());
//告诉消息服务器 消息处理异常,消息需要重新再次发送!
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
}
}
}
评论