消息队列-RabbitMQ


一、RabbitMQ入门

1、MQ的相关概念

1.1.1. 什么是MQ

MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是

message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常

见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不

用依赖其他服务。

1.1.2.为什么要用MQ

1.流量消峰

​ 举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正

常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限

制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分

散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体

验要好。

2.应用解耦

​ 以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合

调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于

消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在

这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流

系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

3.异步处理

​ 有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可

以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api,

B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,

A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消

息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样B 服务也不用

做这些操作。A 服务还能及时的得到异步处理成功的消息。

1.2. RabbitMQ
1.2.1 RabbitMQ的概念

​ RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包

裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是

一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,

存储和转发消息数据。

1.2.2 四大核心概念
  • 生产者

​ 产生数据发送消息的程序是生产者

  • 交换机

​ 交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息

推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推

送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

  • 队列

​ 队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存

储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可

以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

  • 消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费

者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

1.2.3. RabbitMQ 核心部分

RabbitMQ六种模式simple简单模式、work工作模式、publish/subscribe订阅模式、routing路由模式、topic 主题模式、RPC模式。

  1. simple简单模式为一个队列中一条消息,只能被一个消费者消费。

  2. Work工作模式为一个生产者,多个消费者,每个消费者获取到的消息唯一。

  3. publish/subscribe订阅模式为一个生产者发送的消息被多个消费者获取。

  4. routing路由模式为生产者发送的消息主要根据定义的路由规则决定往哪个队列发送。

  5. topic 主题模式为生产者,一个交换机(topicExchange),模糊匹配路由规则,多个队列,多个消费者。

  6. RPC模式为客户端 Client 先发送消息到消息队列,远程服务端 Server 获取消息,然后再写入另一个消息队列,向原始客户端 Client 响应消息处理结果。

1.2.4 名称介绍

RabbitMQ工作原理,如下图

Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似

​ 于网络中的namespace 概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出

​ 多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP

​ Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程

​ 序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客

​ 户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立TCP connection 的开销。

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保

​ 存到 exchange 中的查询表中,用于 message 的分发依据。

2、Hello World

  • 在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代

表使用者保留的消息缓冲区

2.1 simple简单模式
  • 生产者
 public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列 QUEUE
        /**
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         *
         * 1.queue 队列名称
         * 2.durable 是否持久化,当mq重启后,它还在
         * 3.exclusive :1.是否独占:只能有一个消费者监听这个队列,2:当connection关闭时,是否删除队列
         * 4.autoDelete: 是否自动删除,当没有consumer时,自动删除
         * 5.arguments:参数
         */
        //如何没有一个hello_world的队列,则会创建一个。
        channel.queueDeclare("hello_world",true,false,false,null);
        //6.发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 1.exchange:  交换机名称。简单模式下会使用默认的“”
         * 2.routingKey: 路由名称
         * 3.props:配置信息
         * 4.body:发送的消息数据
         */
            String body = "hello   rabbitMq!";
            channel.basicPublish("","hello_world",null,body.getBytes());
       
        //7.关闭资源
//        channel.close();
//        connection.close();
    }
  • 消费者
public static void main(String[] args) throws Exception{

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列 QUEUE
        /**
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         *
         * 1.queue 队列名称
         * 2.durable 是否持久化,当mq重启后,它还在
         * 3.exclusive :1.是否独占:只能有一个消费者监听这个队列,2:当connection关闭时,是否删除队列
         * 4.autoDelete: 是否自动删除,当没有consumer时,自动删除
         * 5.arguments:参数
         */
        //如何没有一个hello_world的队列,则会创建一个。
        channel.queueDeclare("hello_world",true,false,false,null);
        //6.接收消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer  callback)
         * 1.queue:队列名称
         * 2.autoAck:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag:消息唯一标识
             * @param envelope:获取一些信息,交换机,路由key。。。
             * @param properties:配置信息
             * @param body:数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope.getExchange());
                System.out.println("RoutingKey():"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));

            }
        };
        channel.basicConsume("hello_world",true,consumer);

    }

3、work queues 工作队列模式

  • Work Queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

  • 应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

  • 在work模式中可以分为两种模式,一种是两个消费者平均消费队列中的消息,即使他们的消费能力是不一样的,这种似乎不太符合实际的情况。另一种是能者多劳模式,处理消息能力强的消费者会获取更多的 消息,这种模式更符合实际需求。

  • 生产者

 public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列 QUEUE
        /**
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         *
         * 1.queue 队列名称
         * 2.durable 是否持久化,当mq重启后,它还在
         * 3.exclusive :1.是否独占:只能有一个消费者监听这个队列,2:当connection关闭时,是否删除队列
         * 4.autoDelete: 是否自动删除,当没有consumer时,自动删除
         * 5.arguments:参数
         */
        //如何没有一个hello_world的队列,则会创建一个。
        channel.queueDeclare("work_queues",true,false,false,null);
        //6.发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 1.exchange:  交换机名称。简单模式下会使用默认的“”
         * 2.routingKey: 路由名称
         * 3.props:配置信息
         * 4.body:发送的消息数据
         */
        for (int i = 0; i < 10; i++) {
            String body = i+ "hello   rabbitMq!";
            channel.basicPublish("","work_queues",null,body.getBytes());
        }
        //7.关闭资源
//        channel.close();
//        connection.close();
    }
  • 消费者1
public static void main(String[] args) throws Exception{

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列 QUEUE
        /**
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         *
         * 1.queue 队列名称
         * 2.durable 是否持久化,当mq重启后,它还在
         * 3.exclusive :1.是否独占:只能有一个消费者监听这个队列,2:当connection关闭时,是否删除队列
         * 4.autoDelete: 是否自动删除,当没有consumer时,自动删除
         * 5.arguments:参数
         */
        //如何没有一个hello_world的队列,则会创建一个。
        channel.queueDeclare("work_queues",true,false,false,null);
        //6.接收消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer  callback)
         * 1.queue:队列名称
         * 2.autoAck:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag:消息唯一标识
             * @param envelope:获取一些信息,交换机,路由key。。。
             * @param properties:配置信息
             * @param body:数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("envelope:"+envelope.getExchange());
//                System.out.println("RoutingKey():"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));

            }
        };
        channel.basicConsume("work_queues",true,consumer);

    }
  • 消费者2
public static void main(String[] args) throws Exception{

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列 QUEUE
        /**
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         *
         * 1.queue 队列名称
         * 2.durable 是否持久化,当mq重启后,它还在
         * 3.exclusive :1.是否独占:只能有一个消费者监听这个队列,2:当connection关闭时,是否删除队列
         * 4.autoDelete: 是否自动删除,当没有consumer时,自动删除
         * 5.arguments:参数
         */
        //如何没有一个hello_world的队列,则会创建一个。
        channel.queueDeclare("work_queues",true,false,false,null);
        //6.接收消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer  callback)
         * 1.queue:队列名称
         * 2.autoAck:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag:消息唯一标识
             * @param envelope:获取一些信息,交换机,路由key。。。
             * @param properties:配置信息
             * @param body:数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("envelope:"+envelope.getExchange());
//                System.out.println("RoutingKey():"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));

            }
        };
        channel.basicConsume("work_queues",true,consumer);

    }
  • 输出结果
//消费者1 
body:0hello   rabbitMq!
body:2hello   rabbitMq!
body:4hello   rabbitMq!
body:6hello   rabbitMq!
body:8hello   rabbitMq!
    
    
//消费者2
body:1hello   rabbitMq!
body:3hello   rabbitMq!
body:5hello   rabbitMq!
body:7hello   rabbitMq!
body:9hello   rabbitMq!

4.Pub/Sub订阅模式


在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • C:消费者,消息的接收者,会一直等待消息到来

  • Queue:消息队列,接收消息、缓存消息

  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

代码演示

  • 生产者
public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建交换机
        /**
         * exchangeDeclare(String exchange,
         *         BuiltinExchangeType type,
         *         boolean durable,
         *         boolean autoDelete,
         *         boolean internal,
         *         Map<String, Object> arguments);
         *
         *         exchange:交换机名称
         *         type:交换机类型 DIRECT("direct"):定向; FANOUT("fanout"):扇形,发送消息到每个与交换机绑定的队列
         *                 TOPIC("topic"),通配符的方式
         *         durable:是否持久换
         *         autoDelete:自动删除
         *         internal:内部使用。一般为false
         *         arguments:参数
         */
        String exchange  = "test_fanout";
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6.创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey);
         * queue: 队列名称
         * exchange: 交换机名称
         * routingKey;路由键,绑定规则
         * 如果交换机的类型为fanout,routingKey设为“”
         */
        channel.queueBind(queue1Name,exchange,"");
        channel.queueBind(queue2Name,exchange,"");
        //8.发消息
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        channel.basicPublish(exchange,"",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
  • 消费者1
 public static void main(String[] args) throws Exception{

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        String queue1Name = "test_fanout_queue1";

        //6.接收消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer  callback)
         * 1.queue:队列名称
         * 2.autoAck:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag:消息唯一标识
             * @param envelope:获取一些信息,交换机,路由key。。。
             * @param properties:配置信息
             * @param body:数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("envelope:"+envelope.getExchange());
//                System.out.println("RoutingKey():"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台....");

            }
        };
        channel.basicConsume(queue1Name,true,consumer);

    }
  • 消费者2
 //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        String queue2Name = "test_fanout_queue2";

        //6.接收消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer  callback)
         * 1.queue:队列名称
         * 2.autoAck:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag:消息唯一标识
             * @param envelope:获取一些信息,交换机,路由key。。。
             * @param properties:配置信息
             * @param body:数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("envelope:"+envelope.getExchange());
//                System.out.println("RoutingKey():"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息保存数据库....");

            }
        };
        channel.basicConsume(queue2Name,true,consumer);

    }
  • 执行结果
//消费者1
body:日志信息:张三调用了findAll方法...日志级别:info...
将日志信息打印到控制台....
//消费者2
body:日志信息:张三调用了findAll方法...日志级别:info...
将日志信息保存数据库....

总结:

  • 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

  • 发布订阅模式与工作队列模式的区别:

    • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
    • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
    • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

5、Routing 路由模式

5.1 模式说明
  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的

    Routingkey 与消息的 Routing key 完全一致,才会接收到消息

图解:

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key

  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列

  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息

  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

代码演示

-

//1.创建连接工厂
      ConnectionFactory factory = new ConnectionFactory();
      //2.设置参数
      factory.setHost("***.**.***.***");
      factory.setPort(5673);
      factory.setUsername("guest");
      factory.setPassword("guest");
      //3.创建连接connection
      Connection connection = factory.newConnection();
      //4.创建channel
      Channel channel = connection.createChannel();
      //5.创建交换机
      /**
       * exchangeDeclare(String exchange,
       *         BuiltinExchangeType type,
       *         boolean durable,
       *         boolean autoDelete,
       *         boolean internal,
       *         Map<String, Object> arguments);
       *
       *         exchange:交换机名称
       *         type:交换机类型 DIRECT("direct"):定向; FANOUT("fanout"):扇形,发送消息到每个与交换机绑定的队列
       *                 TOPIC("topic"),通配符的方式
       *         durable:是否持久换
       *         autoDelete:自动删除
       *         internal:内部使用。一般为false
       *         arguments:参数
       */
      String exchange  = "test_direct";
      channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT,true,false,false,null);
      //6.创建队列
      String queue1Name = "test_direct_queue1";
      String queue2Name = "test_direct_queue2";
      channel.queueDeclare(queue1Name,true,false,false,null);
      channel.queueDeclare(queue2Name,true,false,false,null);
      //7. 绑定队列和交换机
      /**
       * queueBind(String queue, String exchange, String routingKey);
       * queue: 队列名称
       * exchange: 交换机名称
       * routingKey;路由键,绑定规则
       * 如果交换机的类型为fanout,routingKey设为“”
       */
      //队列1绑定
      channel.queueBind(queue1Name,exchange,"error");
      //队列2绑定
      channel.queueBind(queue2Name,exchange,"info");
      channel.queueBind(queue2Name,exchange,"error");
      channel.queueBind(queue2Name,exchange,"warning");
      //8.发消息
      String body = "日志信息:张三调用了findAll方法...日志级别:info...";
      channel.basicPublish(exchange,"info",null,body.getBytes());
      //9.释放资源
      channel.close();
      connection.close();
  • 消费者1
 //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        String queue1Name = "test_direct_queue1";

        //6.接收消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer  callback)
         * 1.queue:队列名称
         * 2.autoAck:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag:消息唯一标识
             * @param envelope:获取一些信息,交换机,路由key。。。
             * @param properties:配置信息
             * @param body:数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("envelope:"+envelope.getExchange());
//                System.out.println("RoutingKey():"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息保存到数据库....");

            }
        };
        channel.basicConsume(queue1Name,true,consumer);
  • 消费者2
 //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        String queue2Name = "test_direct_queue2";

        //6.接收消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer  callback)
         * 1.queue:队列名称
         * 2.autoAck:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag:消息唯一标识
             * @param envelope:获取一些信息,交换机,路由key。。。
             * @param properties:配置信息
             * @param body:数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("envelope:"+envelope.getExchange());
//                System.out.println("RoutingKey():"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台....");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);

总结:Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

6、Topics通配符模式

6.1 模式说明

图解:

  • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

  • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

代码演示

  • 生产者
//1.创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       //2.设置参数
       factory.setHost("***.**.***.***");
       factory.setPort(5673);
       factory.setUsername("guest");
       factory.setPassword("guest");
       //3.创建连接connection
       Connection connection = factory.newConnection();
       //4.创建channel
       Channel channel = connection.createChannel();
       //5.创建交换机
       /**
        * exchangeDeclare(String exchange,
        *         BuiltinExchangeType type,
        *         boolean durable,
        *         boolean autoDelete,
        *         boolean internal,
        *         Map<String, Object> arguments);
        *
        *         exchange:交换机名称
        *         type:交换机类型 DIRECT("direct"):定向; FANOUT("fanout"):扇形,发送消息到每个与交换机绑定的队列
        *                 TOPIC("topic"),通配符的方式
        *         durable:是否持久换
        *         autoDelete:自动删除
        *         internal:内部使用。一般为false
        *         arguments:参数
        */
       String exchange  = "test_topic";
       channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true,false,false,null);
       //6.创建队列
       String queue1Name = "test_topics_queue1";
       String queue2Name = "test_topics_queue2";
       channel.queueDeclare(queue1Name,true,false,false,null);
       channel.queueDeclare(queue2Name,true,false,false,null);
       //7. 绑定队列和交换机
       /**
        * queueBind(String queue, String exchange, String routingKey);
        * queue: 队列名称
        * exchange: 交换机名称
        * routingKey;路由键,绑定规则
        * 如果交换机的类型为fanout,routingKey设为“”
        */
       //队列1绑定
       channel.queueBind(queue1Name,exchange,"#.error");
       channel.queueBind(queue1Name,exchange,"order.*");
       //队列2绑定
       channel.queueBind(queue2Name,exchange,"*.*");

       //8.发消息
       String body = "日志信息:张三调用了findAll方法...日志级别:info...";
       channel.basicPublish(exchange,"order.info",null,body.getBytes());
       //9.释放资源
       channel.close();
       connection.close();
  • 消费者1
//1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        String queue1Name = "test_topics_queue1";

        //6.接收消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer  callback)
         * 1.queue:队列名称
         * 2.autoAck:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag:消息唯一标识
             * @param envelope:获取一些信息,交换机,路由key。。。
             * @param properties:配置信息
             * @param body:数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("envelope:"+envelope.getExchange());
//                System.out.println("RoutingKey():"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息保存到数据库....");

            }
        };
        channel.basicConsume(queue1Name,true,consumer);
  • 消费者2
//1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("***.**.***.***");
        factory.setPort(5673);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //3.创建连接connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        String queue1Name = "test_topics_queue2";

        //6.接收消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer  callback)
         * 1.queue:队列名称
         * 2.autoAck:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag:消息唯一标识
             * @param envelope:获取一些信息,交换机,路由key。。。
             * @param properties:配置信息
             * @param body:数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("envelope:"+envelope.getExchange());
//                System.out.println("RoutingKey():"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台....");

            }
        };
        channel.basicConsume(queue1Name,true,consumer);

7、工作模式总结

  1. 简单模式 HelloWorld

​ 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。

  1. 工作队列模式 Work Queue

​ 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。

  1. 发布订阅模式 Publish/subscribe

​ 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消

​ 息发送到绑定的队列。

  1. 路由模式 Routing

​ 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机

​ 后,交换机会根据 routing key 将消息发送到对应的队列。

  1. 通配符模式 Topic

​ 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送

​ 消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

7、Springboot整合RabbitMQ(生产者)

  • 添加RabbitMQ依赖:在pom.xml文件中添加以下依赖:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置RabbitMQ连接属性:在application.yml文件中添加以下属性:
#配置RabbitMq的基本信息
spring:
  rabbitmq:
    host: ***.**.***.***
    port: 5673
    username: guest
    password: guest
    virtual-host: /
  • 创建配置类
/**
 * @Title: RabbitConnfig
 * @Author: wfr
 * @Package: com.itheima.rabbitmq.config
 * @Date: 2023/3/20 20:08
 * @Description: RabbitMq配置类
 */
@Configuration
public class RabbitConnfig {

    public static final String EXCHANGE_NAME = "boot_topic_exchange";

    public static final String QUEUE_NAME = "boot_queue";

    //1.交换机
    @Bean("bootExchange")
    public Exchange bootExchange(){
         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    //2.Queue队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    //3.队列和交换机绑定关系
    /**
     * 1.知道哪个队列
     * 2.知道哪个交换机
     * 3.routing key
     */
    @Bean
    public Binding  bindingQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
  • 发送消息

//1.注入RabbitTemplate
@Autowired
public RabbitTemplate rabbitTemplate;

@Test
public void testSend(){
    rabbitTemplate.convertAndSend(RabbitConnfig.EXCHANGE_NAME,"boot.haha","boot mq hello~~~~ ");
  • 接受消息
@Component
public class RabbitMQListener {

    @RabbitListener(queues = "boot_queue")
    public void Listener(Message message){
        System.out.println(new String(message.getBody()));
    }
}

二、RabbitMQ高级特性

1、消息可靠性投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提

供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

rabbitmq 整个消息投递的路径为:

producer—>rabbitmq broker—>exchange—>queue—>consumer

  • 消息从 producerexchange 则会返回一个 confirmCallback

  • 消息从 exchange–>queue 投递失败则会返回一个 returnCallback

我们将利用这两个 callback 控制消息的可靠性投递。

回调模式

//1.注入RabbitTemplate
   @Autowired
   public RabbitTemplate rabbitTemplate;

  /**
    * 确认模式
    * 步骤
    * 1:确认模式开启:publisher-confirms: true
    * 2: 在rabbitTemplate定义ConfirmCallback回调函数
    */
   @Test
   public void testConfirmSend() throws InterruptedException {
       //定义回调函数
       rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
           /**
            *
            * @param correlationData 相关配置信息
            * @param ack exchange交换机是否收到了消息
            * @param cause 失败原因
            */
           @Override
           public void confirm(CorrelationData correlationData, boolean ack, String cause) {
               System.out.println("confirm回调执行了~~~");
               if(ack){
                   //接受成功
                   System.out.println("接受消息成功"+ cause);
               }else {
                   System.out.println("接受消息失败"+ cause);
               }
           }
       });
       //发送消息
       rabbitTemplate.convertAndSend(RabbitConnfig.EXCHANGE_NAME,"boot.confirm","message confirm send~~~");
       //测试方法中消息发送完后rabbitmq的信道就会关闭,导致回调信息无法正常返回。所以需要阻塞等待使其完成回调函数
       Thread.sleep(2000);
   }

回退模式

/**
     * 回退模式 当消息发送给Exchange后,Exchange路由到Queue失败,才会执行ReturnCallBack
     * 步骤
     * 1:确认模式开启:publisher-returns: true
     * 2: 在rabbitTemplate定义ReturnCallBack回调函数
     * 3: 设置Exchange处理消息的模式:
     *     1. 如果没有路由到queue,则丢弃消息(默认)
     *     2. 如果没有路由到queue,则返还给消息发送方ReturnCallBack
     */
    @Test
    public void testReturn() throws InterruptedException {

        rabbitTemplate.setMandatory(true);
        //定义回调函数
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * @param message 返回的消息
             * @param replyCode 回复代码
             * @param replyText 错误信息
             * @param exchange 交换机
             * @param routingKey routingKey
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return执行了");
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
                //相关处理。。。。。
            }
        });
        //发送消息
        rabbitTemplate.convertAndSend(RabbitConnfig.EXCHANGE_NAME,"111boot.confirm","message confirm send~~~");
        //测试方法中消息发送完后rabbitmq的信道就会关闭,导致收不到回调信息无法正常返回。所以需要阻塞等待使其完成回调函数
        Thread.sleep(2000);
    }

RabbitMQ事务

在RabbitMQ中也提供了事务机制,但是性能较差

使用channel下列方法,完成事务控制:

txSelect(), 用于将当前channel设置成transaction模式

txCommit(),用于提交事务

txRollback(),用于回滚事务

2、消费端手动签收

  • 配置手动签收
#配置RabbitMq的基本信息
spring:
  rabbitmq:
    host: ***.**.***.***
    port: 5673
    username: guest
    password: guest
    virtual-host: /
#    设置手动签收
    listener:
      simple:
        acknowledge-mode: manual
@Component
public class RabbitMQListener implements ChannelAwareMessageListener {

    /**
     * Ack 机制
     * 1.设置手动签收
     * 2.实现监听器类实现ChannelAwareMessageListener
     * 3.如果消息成功处理,则调用channel的basicAck()签收
     * 4.如果消息处理失败,则调用channel的basicNAck()拒绝签收,broker重新发送给consumer
     * @param message
     */
    @Override
    @RabbitListener(queues = "confirm_queue")
    public void onMessage(Message message, Channel channel) throws Exception {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            Thread.sleep(1000);
            //1.接受转换消息
            System.out.println(new String(message.getBody()));

            //2.处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 8/0;
            //3.手动签收
            channel.basicAck(tag,false);
        } catch (Exception e) {
           // e.printStackTrace();
            //4.拒绝签收
            /**
             * 第三个参数:重回队列。如果设置为true,则消息重回queue,broker重新发送消息给消费端
             */
            System.out.println("重发");
            channel.basicNack(tag,false,true);
        }
    }
}

3、消费端限流

请求瞬间增多,每秒5000个请求

  • 配置
spring:
  rabbitmq:
    host: ***.**.***.***
    port: 5673
    username: guest
    password: guest
    virtual-host: /
#    设置手动签收 manual代表手动签收
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1 #限制每次发送1条数据。
  • 代码演示
@Component
public class QosMQListener implements ChannelAwareMessageListener {


    /**
     * Consumer限流 机制
     * 1.确保Ack手动签收
     * 2.listener-container配置属性 perfetch = 1
     *   perfetch = 1 表示消费端每次从mq拉取2条消息消费,直到手动确认消费完毕后才会继续拉取下一条消息
     * @param message
     */

    /**
     * @param message
     * @param channel
     * @throws Exception
     */
    @Override
    @RabbitListener(queues = "confirm_queue")
    public void onMessage(Message message, Channel channel) throws Exception {
        long tag = message.getMessageProperties().getDeliveryTag();
        Thread.sleep(1000);
        //1.接受转换消息
        System.out.println(new String(message.getBody()));

        //2.处理业务逻辑
        System.out.println("处理业务逻辑...");
        //3.手动签收
        channel.basicAck(tag,false);

    }

总结

  • 在配置 prefetch属性设置消费端一次拉取多少消息

  • 消费端的确认模式一定为手动确认。acknowledge=”manual

4、TTL

  • TTL 全称 Time To Live(存活时间/过期时间)。
  • 当消息到达存活时间后,还没有被消费,会被自动清除。
  • RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

代码演示

  • 声明队列和交换机,并设置参数
/**
 * @Title: RabbitConnfig
 * @Author: wfr
 * @Package: com.itheima.rabbitmq.config
 * @Date: 2023/3/20 20:08
 * @Description: RabbitMq配置类
 */
@Configuration
public class RabbitConnfig {

    public static final String EXCHANGE_NAME = "test_exchange_ttl";

    public static final String QUEUE_NAME = "test_queue_ttl";

    //1.交换机
    @Bean("test_exchange_ttl")
    public Exchange bootExchange(){
         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    //2.Queue队列
    @Bean("test_queue_ttl")
    public Queue bootQueue(){
    	//声明参数,过期时间x-message-ttl
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_NAME).withArguments(arguments).build();
    }
    //3.队列和交换机绑定关系
    /**
     * 1.知道哪个队列
     * 2.知道哪个交换机
     * 3.routing key
     */
    @Bean
    public Binding  bindingQueueExchange(@Qualifier("test_queue_ttl") Queue queue, @Qualifier("test_exchange_ttl")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
    }
  • 发送10条消息
/**
    * ttl:过期时间
    *  1.队列统一过期
    *  2.消息单独过期
    */
   @Test
   public void testTTl(){
       //1.队列统一过期
       //for (int i = 0; i < 10; i++) {
       //    rabbitTemplate.convertAndSend(RabbitConnfig.EXCHANGE_NAME,"ttl.message","mq ttl hello~~~~ ");
       //}
       //10秒后消息统一过期


       //2.消息单独过期
       MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
           @Override
           public Message postProcessMessage(Message message) throws AmqpException {
               //1.设置Message信息
               message.getMessageProperties().setExpiration("5000");
               //2.返回该消息
               return message;
           }
       };
       rabbitTemplate.convertAndSend(RabbitConnfig.EXCHANGE_NAME, "ttl.message", "mq ttl hello~~~~ ", messagePostProcessor);
   }

总结

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断

​ 这一消息是否过期。

  • 如果两者都进行了设置,以时间短的为准。
  • 队列过期后,会将全队列所有消息全部移除
  • ==消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)==

5、死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以

被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机:

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

代码演示

  • 配置
@Configuration
public class RabbitConnfig {

    /**
     * 正常交换机
     */
    public static final String EXCHANGE_NAME = "test_exchange_dlx";
    /**
     * 正常队列
     */
    public static final String QUEUE_NAME = "test_queue_dlx";

    /**
     * 死信交换机
     */
    public static final String DLX_EXCHANGE_NAME = "exchange_dlx";

    /**
     * 死信队列
     */
    public static final String DLX_QUEUE_NAME = "queue_dlx";

    /**
     * 死信队列
     * 1.声明正常队列和交换机
     * 2.声明私信队列和死信交换机
     * 3.正常队列绑定死信交换机
     *    x-dead-letter-exchange:死信交换机名称
     *    x-dead-letter-routing-key: 发送给死信交换机的routingKey
     * @return
     */

    //1.声明正常交换机
    @Bean("test_exchange_dlx")
    public Exchange bootExchange(){
         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    //2.声明正常Queue
    @Bean("test_queue_dlx")
    public Queue bootQueue(){
        Map<String, Object> arguments = new HashMap<>();
        //1绑定死信队列名称
        arguments.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        //2设置发送给死信交换机的routingKey
        arguments.put("x-dead-letter-routing-key" ,"dlx.test");
        //3设置队列的过期时间
        arguments.put("x-message-ttl",10000);
        //4设置队列的最大长度
        arguments.put("x-max-length",10);
        return QueueBuilder.durable(QUEUE_NAME).withArguments(arguments).build();
    }


    //3.队列和交换机绑定关系
    @Bean
    public Binding bindingQueueExchange(@Qualifier("test_queue_dlx") Queue queue, @Qualifier("test_exchange_dlx")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();
    }

    //2.声明私信队列和死信交换机
    @Bean("exchange_dlx")
    public Exchange dlxExchange(){
        return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).durable(true).build();
    }
    //2.Queue队列
    @Bean("queue_dlx")
    public Queue dlxQueue(){
//        Map<String, Object> arguments = new HashMap<>();
//        arguments.put("x-message-ttl",1000000);
        return QueueBuilder.durable(DLX_QUEUE_NAME).build();
    }


    //3.死信队列和交换机绑定关系
    @Bean
    public Binding bindingQueueExchangeDlx(@Qualifier("queue_dlx") Queue queue, @Qualifier("exchange_dlx")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
    }

   
}
  • 发送消息
/**
    * 发送测试死信消息
    * 1.过期时间
    * 2.长度显示
    * 3.消息拒收
    */
   @Test
   public void testDlx(){
       //1.过期时间的死信时间
      // rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","消息会成为死信队列吗?");
        //2.长度限制后,消息死信
      // for (int i = 0; i < 20; i++) {
      //     rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","消息会成为死信队列吗?");
      // }
       //3.消息拒收
       rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","消息会成为死信队列吗?");
   }
  • 消费者拒收消息代码
@Component
public class DlxListener implements ChannelAwareMessageListener {


    /**
     * Consumer限流 机制
     * 1.确保Ack手动签收
     * 2.listener-container配置属性 perfetch = 1
     *   perfetch = 1 表示消费端每次从mq拉取2条消息消费,直到手动确认消费完毕后才会继续拉取下一条消息
     * @param message
     */

    /**
     * @param message
     * @param channel
     * @throws Exception
     */
    @Override
    @RabbitListener(queues = "test_queue_dlx")
    public void onMessage(Message message, Channel channel) throws Exception {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            //1.接受转换消息
            System.out.println(new String(message.getBody()));

            //2.处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0;
            //3.手动签收
            channel.basicAck(tag, false);
        } catch (Exception e) {
            //e.printStackTrace();
            //4.出现异常,拒绝接受,注意设置重回队列为false
            System.out.println("出现异常,拒绝接受");
            channel.basicNack(tag, true,false);
        }

    }
}

总结:

  1. 死信交换机和死信队列和普通的没有区别

  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

  3. 消息成为死信的三种情况:

    • 队列消息长度到达限制;
    • 消费者拒接消费消息,并且不重回队列;
    • 原队列存在消息过期设置,消息到达超时时间未被消费;

6 、延迟队列

  • 延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

  • 需求:

    1. 下单后,30分钟未支付,取消订单,回滚库存。

    2. 新用户注册成功7天后,发送短信问候。

  • 实现方式:

    1. 定时器

    2. 延迟队列

在RabbitMQ中并未提供延迟队列功能,但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

  • 代码演示
@Configuration
public class RabbitConnfig {

    /**
     * 正常交换机
     */
    public static final String EXCHANGE_NAME = "order_exchange";
    /**
     * 正常队列
     */
    public static final String QUEUE_NAME = "order_queue";

    /**
     * 死信交换机
     */
    public static final String DLX_EXCHANGE_NAME = "order_exchange_dlx";

    /**
     * 死信队列
     */
    public static final String DLX_QUEUE_NAME = "order_queue_dlx";

    /**
     * 死信队列
     * 1.声明正常队列和交换机
     * 2.声明私信队列和死信交换机
     * 3.正常队列绑定死信交换机
     *    x-dead-letter-exchange:死信交换机名称
     *    x-dead-letter-routing-key: 发送给死信交换机的routingKey
     * @return
     */

    //1.声明正常的订单交换机
    @Bean("order_exchange")
    public Exchange bootExchange(){
         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    //2.声明正常的订单queue
    @Bean("order_queue")
    public Queue bootQueue(){
        Map<String, Object> arguments = new HashMap<>();
        //绑定死信队列名称
        arguments.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        //设置发送给死信交换机的routingKey
        arguments.put("x-dead-letter-routing-key" ,"dlx.order.cancel");
        //设置队列的过期时间
        arguments.put("x-message-ttl",10000);

        return QueueBuilder.durable(QUEUE_NAME).withArguments(arguments).build();
    }


    //3.队列和交换机绑定关系
    @Bean
    public Binding bindingQueueExchange(@Qualifier("order_queue") Queue queue, @Qualifier("order_exchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }

    //2.声明私信队列和死信交换机
    @Bean("order_exchange_dlx")
    public Exchange dlxExchange(){
        return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).durable(true).build();
    }
    //2.Queue队列
    @Bean("order_queue_dlx")
    public Queue dlxQueue(){
//        Map<String, Object> arguments = new HashMap<>();
//        arguments.put("x-message-ttl",1000000);
        return QueueBuilder.durable(DLX_QUEUE_NAME).build();
    }


    //3.死信队列和交换机绑定关系
    @Bean
    public Binding bindingQueueExchangeDlx(@Qualifier("order_queue_dlx") Queue queue, @Qualifier("order_exchange_dlx")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
    }
}
  • 生产者
@Test
public void testDelay() throws InterruptedException {
    //发送订单消息
    rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id =1,time = 2022-09-09");
    //循环结束后就可以看到消费者接收到消息
    for (int i = 0; i < 10; i++) {
        System.out.println(i+"...");
        Thread.sleep(1000);
    }
}
  • 消费者
@Component
public class OrderListener implements ChannelAwareMessageListener {



    /**
     * 延迟队列实现,注意此处监听的是延迟队列
     * @param message
     * @param channel
     * @throws Exception
     */
    @Override
    @RabbitListener(queues = "order_queue_dlx")
    public void onMessage(Message message, Channel channel) throws Exception {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            //1.接受转换消息
            System.out.println(new String(message.getBody()));

            //2.处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否支付成功...");
            System.out.println("取消订单,回滚库存...");
            //3.手动签收
            channel.basicAck(tag, false);
        } catch (Exception e) {
            //e.printStackTrace();
            //4.出现异常,拒绝接受
            System.out.println("出现异常,拒绝接受");
            channel.basicNack(tag, true,false);
        }

    }
}

总结

  • 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
  • RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。

7、RabbitMQ相关命令

#查看队列
rabbitmqctl list_queues

#查看exchanges
rabbitmqctl list_exchanges

#查看用户
rabbitmqctl list_users

#查看连接
rabbitmqctl list_connections

#查看消费者信息
rabbitmqctl list_consumers

#查看环境变量
rabbitmqctl environment

#查看未被确认的队列
rabbitmqctl list_queues name messages_unacknowledged

#查看单个队列的内存使用
rabbitmqctl list_queues name memory

#查看准备就绪的队列
rabbitmqctl list_queues name messages_ready

8、消息追踪

在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能

是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也

有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者

又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时

候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。

三、RabbitMQ应用问题

1、消息可靠性保障

  • 消息补偿机制

2、消息幂等性保障

  • 乐观锁解决方案

​ 幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任

​ 意多次执行对资源本身所产生的影响均与一次执行的影响相同。

​ 在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。


文章作者: Fansboom
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Fansboom !
评论