SpringBoot整合实现RabbitMQ


1. pom.xml中添加相关的依赖

<!--添加AMQP的启动器-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2. 配置application.yml

server:
  port: 8080
spring:
  application:
    name: mq-rabbitmq-producer
  #rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    #注意:guest用户只能链接本地服务器 比如localhost  不可以连接远程服务器
    username: guest
    password: guest
    #虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
    virtual-host: /
    #支持发布返回
    publisher-returns: true
    listener:
      # Routing 路由模型(交换机类型:direct)
      direct:
        #消息确认:手动签收
        acknowledge-mode: manual
        #当前监听容器数
        concurrency: 1
        #最大数
        max-concurrency: 10
        #是否支持重试
        retry:
          enabled: true
          #重试次数5,超过5次抛出异常
          max-attempts: 5
          #重试间隔 3s
          max-interval: 3000

3. 配置RabbitMQ常量类,配置直连交换机名称、消息队列名称、routingkey,这里把消息队列名称和routingkey设置为同名。

package com.example.mqrabbitmqproducer.util.rabbitmq;

/**
 * RabbitMQ RoutingKey 常量工具类
 * @author qzz
 */
public class RabbitMQConstantUtil {

    /**
     * 交换机名称
     */
    public static final String DIRECT_EXCHANGE = "directExchange";

    /**
     * 取消订单 队列名称 routingkey
     */
    public static final String CANCEL_ORDER = "cancel-order";

    /**
     * 自动确认订单 队列名称\routingkey
     */
    public static final String CONFIRM_ORDER = "confirm-order";


}

4. 创建RabbitMQConfig配置类,rabbitmq配置类:配置Exchange、Queue、以及绑定交换机

package com.example.mqrabbitmqproducer.util.rabbitmq.config;

import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
 * @author qzz
 */
@Configuration
@EnableRabbit
public class RabbitMQConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
         /**
         * 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式,
         * 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类
         */
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //开启手动ACK
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public AmqpTemplate amqpTemplate(){
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);
        /**
         * ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
         * config : 需要开启rabbitmq发送失败回退 
         * yml配置publisher-returns: true 
         * 或rabbitTemplate.setMandatory(true);设置为true
         */
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
            byte[] message = returnedMessage.getMessage().getBody();
            Integer replyCode = returnedMessage.getReplyCode();
            String replyText = returnedMessage.getReplyText();
            String exchange = returnedMessage.getExchange();
            String routingKey = returnedMessage.getRoutingKey();

            log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
                    new String(message),messageId,replyCode,replyText,exchange,routingKey);

        });
        return rabbitTemplate;
    }

    /**
     * 声明直连交换机  支持持久化
     * @return
     */
    @Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE)
    public Exchange directExchange(){
        return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build();
    }

    /**
     * 取消订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CANCEL_ORDER)
    public Queue cancelOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true);
    }

    /**
     * 把取消订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue,
                                      @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs();
    }

    /**
     * 自动确认订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CONFIRM_ORDER)
    public Queue confirmOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true);
    }

    /**
     * 把自动确认订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue,
                                       @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs();
    }
}

5. 创建生产者用于发送消息

package com.example.mqrabbitmqproducer.util.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

/**
 * Routing 路由模型(交换机类型:direct)
 * 消息生成者
 * @author qzz
 */
@Component
public class DirectSender {

    private static final Logger log = LoggerFactory.getLogger(DirectSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     * @param routingKey
     * @param msg
     */
    public void send (String routingKey,String msg){
        Message message = MessageBuilder.withBody(msg.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("utf-8")
                .setMessageId(UUID.randomUUID()+"").build();

        log.info("【发送者】消息内容【{}】 交换机【{}】 路由【{}】 消息ID【{}】",msg,RabbitMQConstantUtil.DIRECT_EXCHANGE
                ,routingKey,message.getMessageProperties().getMessageId());
        rabbitTemplate.convertAndSend(RabbitMQConstantUtil.DIRECT_EXCHANGE,routingKey,message);
    }
}

6. 创建一个类,用于模拟测试

package com.example.mqrabbitmqproducer.controller;

import com.alibaba.fastjson.JSONObject;
import com.example.mqrabbitmqproducer.util.rabbitmq.DirectSender;
import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * 模拟测试消息发送
 * @author qzz
 */
@RestController
@RequestMapping("/order")
public class TestRabbitMQSendMsg {

    /**
     * rabbitMQ消息发送
     */
    @Autowired
    private DirectSender directSender;

    /**
     * 测试取消订单,发送消息
     */
    @GetMapping("/cancel")
    public void cancel(){

        //取消订单逻辑省略

        //取消订单,发送消息
        Map<String, Object> map = new HashMap<>();
        map.put("order_number","4364756867987989");
        map.put("product_id","1");
        directSender.send(RabbitMQConstantUtil.CANCEL_ORDER, JSONObject.toJSONString(map));
    }

    /**
     * 测试自动确认订单,发送消息
     */
    @GetMapping("/confirm")
    public void confirm(){

        //自动确认订单,发送消息
        String order_number="4364756867987989";
        directSender.send(RabbitMQConstantUtil.CONFIRM_ORDER, order_number);
    }
}

7. 创建mq-rabbitmq-consumer(消费者)消费消息

引入依赖

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <!--添加AMQP的启动器-->
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

配置application.yml

server:
  port: 8083
spring:
  application:
    name: mq-rabbitmq-consumer
  #rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    #注意:guest用户只能链接本地服务器 比如localhost  不可以连接远程服务器
    username: guest
    password: guest
    #虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
    virtual-host: /

配置RabbitMQ常量类,配置直连交换机名称、消息队列名称、routingkey

package com.example.mqrabbitmqconsumer.util.rabbitmq;

/**
 * RabbitMQ RoutingKey 常量工具类
 * @author qzz
 */
public class RabbitMQConstantUtil {

    /**
     * 交换机名称
     */
    public static final String DIRECT_EXCHANGE = "directExchange";

    /**
     * 取消订单 队列名称 \routingkey
     */
    public static final String CANCEL_ORDER = "cancel-order";

    /**
     * 自动确认订单 队列名称\routingkey
     */
    public static final String CONFIRM_ORDER = "confirm-order";


}

创建RabbitMQConfig配置类,rabbitmq配置类:配置Exchange、Queue、以及绑定交换机

package com.example.mqrabbitmqconsumer.util.rabbitmq.config;

import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
 * @author qzz
 */
@Configuration
@EnableRabbit
public class RabbitMQConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
       //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        /**
         * 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式,
         * 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类
         */
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //开启手动ACK
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public AmqpTemplate amqpTemplate(){
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);
        /**
         * ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
         * config : 需要开启rabbitmq发送失败回退     
         * yml配置publisher-returns: true    
         * 或rabbitTemplate.setMandatory(true);设置为true
         */
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
            byte[] message = returnedMessage.getMessage().getBody();
            Integer replyCode = returnedMessage.getReplyCode();
            String replyText = returnedMessage.getReplyText();
            String exchange = returnedMessage.getExchange();
            String routingKey = returnedMessage.getRoutingKey();

            log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
                    new String(message),messageId,replyCode,replyText,exchange,routingKey);

        });
        return rabbitTemplate;
    }

    /**
     * 声明直连交换机  支持持久化
     * @return
     */
    @Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE)
    public Exchange directExchange(){
        return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build();
    }

    /**
     * 取消订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CANCEL_ORDER)
    public Queue cancelOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true);
    }

    /**
     * 把取消订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue,
                                      @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs();
    }

    /**
     * 自动确认订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CONFIRM_ORDER)
    public Queue confirmOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true);
    }

    /**
     * 把自动确认订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue,
                                       @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs();
    }
}

创建消费者消息监听,监听取消订单

package com.example.mqrabbitmqconsumer.listener;

import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * 监听取消订单
 * @author qzz
 */
@Component
public class RabbitMQCancelOrderListener {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQCancelOrderListener.class);

    /**
     * 接受消息
     * @param channel
     * @param message
     * @throws Exception
     */
    @RabbitHandler
    @RabbitListener(queues = RabbitMQConstantUtil.CANCEL_ORDER)
    public void receiverMsg(Channel channel, Message message) throws Exception {
        //body 即消息体
        String msg = new String(message.getBody());
        String messageId = message.getMessageProperties().getMessageId();
        log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId);

        try{
            //如果有业务逻辑,则在这里编写


            //告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("消息处理出现异常:{}",e.getMessage());
            //告诉消息服务器 消息处理异常,消息需要重新再次发送!
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
        }
    }
}

监听自动确认订单

package com.example.mqrabbitmqconsumer.listener;

import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * 监听自动确认订单
 * @author qzz
 */
@Component
public class RabbitMQConfirmOrderListener {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfirmOrderListener.class);

    /**
     * 接受消息
     * @param channel
     * @param message
     * @throws Exception
     */
    @RabbitHandler
    @RabbitListener(queues = RabbitMQConstantUtil.CONFIRM_ORDER)
    public void receiverMsg(Channel channel, Message message) throws Exception {
        //body 即消息体
        String msg = new String(message.getBody());
        String messageId = message.getMessageProperties().getMessageId();
        log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId);

        try{
            //如果有业务逻辑,则在这里编写


            //告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("消息处理出现异常:{}",e.getMessage());
            //告诉消息服务器 消息处理异常,消息需要重新再次发送!
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
        }
    }
}

java
  • 作者:黄伟明
  • 发表时间:2023-12-28 15:14
  • 版权声明:非商业自由转载

评论



留言