【消息队列】RabbitMQ四种工作模式

Hi,大家好,我是千羽。
学一门技术,先了解背景,为什么会出现它,它解决了什么问题,任何一项技术都不是凭空出生,肯定是为了解决某一个问题才出现的。
消息队列的应用场景就很常见了。
异步处理:当一个系统接收到一个请求,但这个请求的处理需要花费较长时间时,可以将这个请求放入 RabbitMQ 的消息队列中,然后立即返回给客户端一个响应。后台的工作进程可以异步地从队列中获取消息并进行处理,从而提高了系统的响应速度和吞吐量。
任务调度:RabbitMQ 可以与任务调度框架(如 Celery)结合使用,用于在分布式系统中调度和执行定时任务、批处理任务等。任务可以被推送到 RabbitMQ 的队列中,然后由工作节点异步地拉取并执行。
数据同步:比如mysql同步es。在微服务架构中,不同的服务之间可能需要共享数据或进行数据的同步。RabbitMQ 可以作为一个数据总线(Data Bus),允许服务之间发布和订阅数据变更事件。当某个服务的数据发生变化时,它可以发布一个事件消息到 RabbitMQ,其他订阅了该事件的服务会收到通知并进行相应的处理。
系统解耦:通过使用 RabbitMQ,可以将不同的系统或组件解耦。各个系统或组件之间不需要直接进行通信,而是通过消息队列进行间接的通信。这降低了系统之间的耦合度,使得系统更加易于维护和扩展。
日志收集:在分布式系统中,日志的收集和分析是一个重要的任务。RabbitMQ 可以作为一个日志收集的中心点,各个服务将日志消息发送到 RabbitMQ,然后由专门的日志处理服务进行消费和处理。
流量削峰:在高并发的场景下,RabbitMQ 可以作为流量削峰的工具。当系统接收到大量的请求时,可以将这些请求放入 RabbitMQ 的队列中进行缓冲,然后由后台的工作进程逐步处理。这样可以避免系统因为瞬时的高并发而崩溃或响应过慢。
消息广播:RabbitMQ 支持发布/订阅模式,可以实现消息的广播。当一个消息被发布到 RabbitMQ 时,所有订阅了该消息的消费者都会收到这个消息。这种机制可以用于实现实时通知、消息推送等功能。
分布式事务:虽然 RabbitMQ 本身不直接支持分布式事务,但它可以与其他技术(如两阶段提交、本地消息表等)结合使用,实现分布式事务的可靠执行。通过将事务的操作放入 RabbitMQ 的消息队列中,并在消息被成功消费后提交事务,可以确保事务的原子性和一致性。
跨语言通信:RabbitMQ 支持多种编程语言的客户端库,使得不同语言编写的服务之间可以通过 RabbitMQ 进行通信。这降低了系统集成的复杂性,并提高了系统的灵活性。

一、工作队列模式
上一篇我们刚刚写的《【消息队列】RabbitMP入门实战》文章就是简单模式,只是简化到了最简单的情况:生产者只有一个发送一个消息。
消费者也只有一个,消息也只能被这个消费者消费。
如果是:生产者发送多个消息,由多个消费者来竞争,谁抢到算谁的?
其实
多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。
Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务,可以提高消息处理的效率。

工作队列模式概述:工作队列模式允许一个生产者将消息发送到队列,然后由多个消费者共享这个队列中的消息。消息一旦被消费者接收,就会从队列中删除。这种模式通常用于负载均衡和任务分发。
应用场景和示例:
订单处理:一个订单系统接收到新的订单后,将其发送到工作队列。多个订单处理服务实例从队列中取出订单并处理。
处理:用户上传后,处理服务将处理任务发送到工作队列,多个处理工作进程从队列中取出任务并执行。
RabbitMQ核心功能和优势:
通过队列实现了消息的缓冲和存储。
消费者之间可以并行处理消息,提高系统的吞吐量和响应速度。
下面运行看看demo效果:

注意:运行的时候先启动消费者1,2程序,然后再启动生产者端程序。
如果已经运行过生产者程序,则手动把work_queue队列删掉。

  1. 生产者代码

封装工具类:ConnectionUtil.class
package com.nateshao.work_queue.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class ConnectionUtil {
    public static final String HOST_ADDRESS = “localhost”;
    public static Connection getConnection() throws Exception {
    // 定义连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置服务地址
    factory.setHost(HOST_ADDRESS);
    // 端口
    factory.setPort(5672);
    //设置账号信息,用户名、密码、vhost
    factory.setVirtualHost(“/”);
    factory.setUsername(“guest”);
    factory.setPassword(“123456”);
    // 通过工程获取连接
    Connection connection = factory.newConnection();
    return connection;
    } public static void main(String[] args) throws Exception {
    Connection con = ConnectionUtil.getConnection();
    // amqp://guest@localhost:5672/
    System.out.println(con);
    con.close();
    }
    }
    消费者:Producer.class
    package com.nateshao.work_queue;

import com.nateshao.work_queue.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class Producer {
    public static final String QUEUE_NAME = “work_queue”;
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    for (int i = 1; i <= 10; i++) {
    String body = i + “hello rabbitmq~~~”;
    channel.basicPublish(“”, QUEUE_NAME, null, body.getBytes());
    }
    channel.close();
    connection.close();
    }
    }
    1.1 发送消息效果

可以看到name显示work_queue,记录着10条消息

  1. 消费者代码:Consumer.class

创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。
Consumer1.class
package com.nateshao.work_queue;

import com.nateshao.work_queue.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class Consumer1 {
    static final String QUEUE_NAME = “work_queue”;
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println(“Consumer1 body:” + new String(body));
    }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
    }
    }
    Consumer2.class
    package com.nateshao.work_queue;

import com.nateshao.work_queue.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class Consumer2 {
    static final String QUEUE_NAME = “work_queue”;
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println(“Consumer1 body:” + new String(body));
    }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
    }
    }
    2.1 运行效果

最终两个消费端程序竞争结果如下:

二、发布订阅模式

在进入发布订阅模式之前,可以先了解一下交换机
生产者不是把消息直接发送到队列,而是发送到交换机,交换机接收消息,而如何处理消息取决于交换机的类型
交换机有如下3种常见类型
Fanout:广播,将消息发送给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key的队列
Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
其实
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因 此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那 么消息会丢失!
Publish/Subscribe 模式说明
组件之间关系:
生产者把消息发送到交换机
队列直接和交换机绑定
工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列
理解概念:
Publish:发布,这里就是把消息发送到交换机上
Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系
应用场景和示例:
实时日志系统:多个应用实例将日志消息发送到交换机,交换机将消息广播到多个日志收集队列,每个队列对应一个日志收集服务实例。
新闻推送:新闻发布系统发布新闻到交换机,交换机将新闻广播到多个用户订阅的队列,用户从各自的队列中接收新闻推送。
RabbitMQ核心功能和优势:
交换机和队列的解耦设计,使得消息可以被广播到多个队列。
消费者可以按需订阅自己关心的消息,实现消息的灵活分发。

依赖导入:pom.xml


4.0.0
org.springframework.boot spring-boot-starter-parent 3.3.0
com.nateshao
code3_publish_subscribe
0.0.1-SNAPSHOT
code3_pubsub
pubsub
17

org.springframework.boot spring-boot-starter

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.20.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

1.1 生产者代码

package com.nateshao.pubsub;

import com.nateshao.pubsub.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class Producer {
    public static void main(String[] args) throws Exception {
    // 1、获取连接
    Connection connection = ConnectionUtil.getConnection();
    // 2、创建频道
    Channel channel = connection.createChannel();
    // 参数1. exchange:交换机名称
    // 参数2. type:交换机类型
    // DIRECT(“direct”):定向
    // FANOUT(“fanout”):扇形(广播),发送消息到每一个与之绑定队列。
    // TOPIC(“topic”):通配符的方式
    // HEADERS(“headers”):参数匹配
    // 参数3. durable:是否持久化
    // 参数4. autoDelete:自动删除
    // 参数5. internal:内部使用。一般false
    // 参数6. arguments:其它参数
    String exchangeName = “test_fanout”; // 3、创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); // 4、创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 5、绑定队列和交换机 // 参数1. queue:队列名称 // 参数2. exchange:交换机名称 // 参数3. routingKey:路由键,绑定规则 // 如果交换机的类型为fanout,routingKey设置为"" channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; // 6、发送消息 channel.basicPublish(exchangeName,"",null,body.getBytes()); // 7、释放资源 channel.close(); connection.close(); }
    }
    1.2 消费者代码

消费者1号:Consumer1.class
package com.nateshao.pubsub;

import com.nateshao.pubsub.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class Consumer1 {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String queue1Name = “test_fanout_queue1”;
    channel.queueDeclare(queue1Name, true, false, false, null);
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println(“body:” + new String(body));
    System.out.println(“队列 1 消费者 1 将日志信息打印到控制台…..”);
    }
    };
    channel.basicConsume(queue1Name, true, consumer);
    }
    }
    消费者2号:Consumer2.class
    package com.nateshao.pubsub;

import com.nateshao.pubsub.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class Consumer2 {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String queue1Name = “test_fanout_queue2”;
    channel.queueDeclare(queue1Name, true, false, false, null);
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println(“body:” + new String(body));
    System.out.println(“队列 2 消费者 2 将日志信息打印到控制台…..”);
    }
    };
    channel.basicConsume(queue1Name, true, consumer);
    }
    }
    1.3 运行效果

先启动消费者,然后再运行生产者程序发送消息:

1.4 小结

交换机和队列的绑定关系如下图所示:

交换机需要与队列进行绑定,绑定之后,一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
工作队列模式本质上是绑定默认交换机
发布订阅模式绑定指定交换机
监听同一个队列的消费端程序彼此之间是竞争关系
绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息
三、路由模式

通过『路由绑定』的方式,把交换机和队列关联起来
交换机和队列通过路由键进行绑定
生产者发送消息时不仅要指定交换机,还要指定路由键
交换机接收到消息会发送到路由键绑定的队列
在编码上与 Publish/Subscribe发布与订阅模式的区别:
交换机的类型为:Direct
队列绑定交换机的时候需要指定routing key。
路由模式概述:路由模式在发布/订阅模式的基础上增加了路由键(Routing Key)的概念。生产者发送消息时指定路由键,交换机根据路由键将消息发送到匹配的队列。
应用场景和示例:
邮件系统:邮件系统根据邮件类型(如工作邮件、个人邮件等)设置不同的路由键,交换机根据路由键将邮件发送到不同的处理队列。
视频监控系统:摄像头产生的视频流根据监控区域设置不同的路由键,交换机根据路由键将视频流发送到对应的处理队列。
RabbitMQ核心功能和优势:
通过路由键实现了消息的过滤和分发,使得消息能够精确地发送到目标队列。
提高了系统的灵活性和可扩展性。

  1. 生产者代码

Producer.class
package com.nateshao.routing;

import com.nateshao.routing.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class Producer {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String exchangeName = “test_direct”;
    // 创建交换机
    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
    // 创建队列
    String queue1Name = “test_direct_queue1”;
    String queue2Name = “test_direct_queue2”;
    // 声明(创建)队列
    channel.queueDeclare(queue1Name, true, false, false, null);
    channel.queueDeclare(queue2Name, true, false, false, null);
    // 队列绑定交换机
    // 队列1绑定error
    channel.queueBind(queue1Name, exchangeName, “error”);
    // 队列2绑定info error warning
    channel.queueBind(queue2Name, exchangeName, “info”);
    channel.queueBind(queue2Name, exchangeName, “error”);
    channel.queueBind(queue2Name, exchangeName, “warning”);
    String message = “日志信息:张三调用了delete方法.错误了,日志级别error”;
    // 发送消息
    channel.basicPublish(exchangeName, “error”, null, message.getBytes());
    System.out.println(message);
    // 释放资源
    channel.close();
    connection.close();
    }
    }
  1. 消费者代码

1、消费者1号:Consumer1.class
package com.nateshao.routing;

import com.nateshao.routing.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class Consumer1 {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String queue1Name = “test_direct_queue1”;
    channel.queueDeclare(queue1Name, true, false, false, null);
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println(“body:” + new String(body));
    System.out.println(“Consumer1 将日志信息打印到控制台…..”);
    }
    };
    channel.basicConsume(queue1Name, true, consumer);
    }
    }
    消费者2号:Consumer2.class
    package com.nateshao.routing;

import com.nateshao.routing.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class Consumer2 {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String queue2Name = “test_direct_queue2”;
    channel.queueDeclare(queue2Name, true, false, false, null);
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println(“body:” + new String(body));
    System.out.println(“Consumer2 将日志信息存储到数据库…..”);
    }
    };
    channel.basicConsume(queue2Name, true, consumer);
    }
    }
  1. 运行结果

4.绑定关系

四、主题模式

topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队 列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用 通配符
Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割, 例如:item.insert
通配符规则:

:匹配零个或多个词 • *:匹配一个词

主题模式概述:主题模式在路由模式的基础上使用了更复杂的路由规则。路由键不再是一个简单的字符串,而是一个由点分隔的字符串。交换机根据路由键和队列绑定的模式进行匹配,将消息发送到匹配的队列。
应用场景和示例:
股票交易系统:股票交易系统根据股票的代码和类型设置路由键,如”US.STOCK.AAPL”表示美国股市的苹果公司股票。交换机根据路由键和队列绑定的模式(如”US.*”表示接收美国股市的所有股票信息)将消息发送到匹配的队列。
RabbitMQ核心功能和优势:
通过模式匹配实现了更复杂的消息过滤和分发。
使得系统能够灵活地处理大量的消息和多样化的需求。
1.生产者代码

package com.nateshao.topics;

import com.nateshao.topics.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    / public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = “test_topic”; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null); String queue1Name = “test_topic_queue1”; String queue2Name = “test_topic_queue2″; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); // 绑定队列和交换机 // 参数1. queue:队列名称 // 参数2. exchange:交换机名称 // 参数3. routingKey:路由键,绑定规则 // 如果交换机的类型为fanout ,routingKey设置为”” // routing key 常用格式:系统的名称.日志的级别。 // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name, exchangeName, “#.error”); channel.queueBind(queue1Name, exchangeName, “order.“);
    channel.queueBind(queue2Name, exchangeName, “.“); // 分别发送消息到队列:order.info、goods.info、goods.error String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]"; // channel.basicPublish(exchangeName,”order.info”,null,body.getBytes());

// body = “[所在系统:goods][日志级别:info][日志内容:商品发布成功]”;
// channel.basicPublish(exchangeName,”goods.info”,null,body.getBytes());

    body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
    channel.basicPublish(exchangeName, "goods.error", null, body.getBytes());
    channel.close();
    connection.close();
}

}
2.消费者代码

2.1 消费者1号

消费者1监听队列1:
package com.nateshao.topics;

import com.nateshao.topics.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    class Consumer1 {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String QUEUE_NAME = “test_topic_queue1”;
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println(“body:” + new String(body));
    }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
    }
    }
    2.2 消费者2号

消费者2监听队列2:
package com.nateshao.topics;

import com.nateshao.topics.util.ConnectionUtil;
import com.rabbitmq.client.; import java.io.IOException; /*

  • @Author 千羽
  • @公众号 程序员千羽
  • @Date 2024/5/29 16:00
  • @Version 1.0
    */
    public class Consumer2 {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String QUEUE_NAME = “test_topic_queue2”;
    channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    Consumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println(“body:”+new String(body));
    }
    };
    channel.basicConsume(QUEUE_NAME,true,consumer);
    }
    }
    3.运行效果

关键代码如下
channel.queueBind(queue1Name, exchangeName, “#.error”);
channel.queueBind(queue1Name, exchangeName, “order.“); channel.queueBind(queue2Name, exchangeName, “.*”);

// 分别发送消息到队列:order.info、goods.info、goods.error
String body = “[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]”;
channel.basicPublish(exchangeName,”order.info”,null,body.getBytes());
路由匹配队列1,2
队列1:

队列2:

创作不易,感谢大家支持!
参考链接:https://www.bilibili.com/video/BV1sw4m1U7Qe/

声明:文中观点不代表本站立场。本文传送门:http://eyangzhen.com/416803.html

(0)
联系我们
联系我们
分享本页
返回顶部