目录
消息协议
AMQP
MQTT
STOMP
XMPP
JMS
接口规范,与平台无关的API
是对AMQP、MQTT、STOMP、XMPP更高一层抽象
类似JDBC
点对点模型
发布/订阅模型
ActiveMQ完全支持JMS
如果大家对消息协议和JMS感兴趣,更多详情可以翻阅相关书籍和百度,这两块我也是跳着看的–
认识RabbitMQ
RabbitMQ 是Erlang开发的 AMQP协议 的消息队列,是目前主流的消息队列产品之一
在《分布式消息中间件实践》一书中,介绍了RabbitMQ、ActiveMQ、Kafka、RocketMQ,打算把这本书看完,目前只看了RabbitMQ部分
据我了解ActiveMQ的使用应该少了,但毕竟也是Apache的,目前比较活跃的是RabbitMQ、RocketMQ
Kafka主要是用于大数据相关
Kafka的性能要远远高于其他三,支持集群,基于发布订阅,零拷贝技术,批处理,高性能io,缓存等
当时在网易,新闻推送就是用的2层kafka
RabbitMQ有非常灵活的路由协议,客户端也支持很多种语言
饿了么是根据RabbitMQ使用 go开发的类似RabbitMQ MaxQ,毕竟Erlang太小众
京东也开发的JMQ
阿里用的RocketMQ
主流MQ对比
本篇我们主要近距离了解RabbitMQ
如果有一些概念性的东西看不大懂,或者由于我的水平垃圾写的不清晰
请耐心的接着往下看,会有例子等着你,可以把这些例子都敲一边,
我还是觉得这篇文章可以帮你一天入门RabbitMQ,毕竟是我整合了很多的网上的文章和书中的内容,再加上我自己的实操和理解
如有错误,敬请指正
名词解释
Broker: 接收消息和发送消息的 mq服务器实例。
Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中
的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,
每个用户在自己的vhost创建exchange/queue等。避免命名冲突。
可以把Virtual host理解成mini版rabbitMQ
Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker
不会断开连接,除非出现网络故障或broker服务出现问题。
Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开
销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接(虚拟的),如果应用程序支持多线程,通
常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker
识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立
TCP connection的开销。
Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue
中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中(消息复制)。
Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到
exchange中的查询表中,用于message的分发依据
架构
交换器类型
主要关注下面三种就行
direct
这种类型的交换机的路由规则是根据一个routingKey的标识,交换机通过一个routingKey与队列绑定 ,在生
产者生产消息的时候 指定一个routingKey 当绑定的队列的routingKey 与生产者发送的一样 那么交换机会吧
这个消息发送给对应的队列。
可以理解成点对点模型
fanout
理解成消息广播即可,一对多,只要绑定了交换机的队列就都可以收到消息
注意:fanout不涉及routingKey
topic
主题订阅
符合某一主题的queue都接收消息
通常使用到匹配符
持久化
Queue持久化
Message持久化
Exchange持久化
消息确认
如何保证消息从生产者发送到Broker,消息没有丢失,Broker没有宕机?
【生产者 –> Broker】
方式:
1,事务机制(不建议)
2,确认模式(异步)
确认模式:
把信道设置为确认模式时,在该信道发布的消息都会被分配一个唯一ID,当消息被成功发送到匹配的队列时,
信道就会向生产者发送确认消息,该消息包含了刚才说的唯一ID,这时生产者对比该ID,就知道消息已经成功发送到队列
确认模式细分:
1,普通确认
2,批量确认
3,异步确认
消费者应答
如何确认消费者收到了服务端发送的消息?消息没有丢失?
【Broker –> 消费者】
消费者向Broker的应答机制,消费者回执(Consumer Acknowledgement)
两种ACK方式:
手动
自动
autoAck
拒绝消息
消费者当前也有拒绝接收消息的权利
拒绝一条消息
拒绝多条消息
可以设置拒绝后的消息Broker如何处置:丢弃 or 重新放入队列
有一个蛋疼的是如果你只有一个消费者时,你还选择决绝消息后重新放回队列,那么这条消息又来了,又拒绝,又回队列,又来了。。。
这条消息就会在同一个消费者上发生死循环,要避免这种情况
com.rabbit.client.Channel
中的2个重载的basicReject方法
消息预取
如果有100个消息需要处理,有2个消费者A、B,rabbitmq会给A、B各50条消息,
由于消费者消费消息的速度不同,最终消息处理完的时间自然是以消费速度慢的为准,这样就会影响整体的性能
如果能够让消费速度更快的消费更多消息,不就舒服了吗,就像负载均衡策略那样,不采用简单的轮询
这是由rabbitmq消息发放机制决定的,有的消费者忙,有的消费者闲,而系统整体的性能往往很受短板的影响
如何解决?
采用消息预取
可以设置预取数量 prefetchCount
如果prefetchCount = 1,每次消费者收到一条消息后再给服务端ACK时,服务端在收到ACK之前都不会再发送消息
消息一条一条处理
注意:使用消息预取的前提是设置为手动确认消息
如果设置为1可以极大的利用客户端的性能,我消费完了就可以赶紧消费下一条不会导致忙的很忙闲的很闲,
但是,我们每消费一条消息 就要通知一次rabbitmq 然后再取出新的消息,
这样对于rabbitmq的性能来讲是不合理的 所以这个参数要根据业务情况设置
一般要考虑个平衡值,prefetchCount不能太小也不能太大
可以通过com.rabbit.client.Channel
中的basicQos方法设置预取数量
流控机制
目的:保证稳定性
当下游消费速度跟不上游生产速度造成消息积压时,,触发流控机制,就会阻塞消息生产者的连接,阻止生产者继续发送消息
直到资源不足的警告解除,就算解除了,rabbitMQ的接收消息速率和发送消息速率都会减小,性能收到很大的影响,恶化。
就好比你把人打了一顿,鼻青脸肿,就算第二天好的差不多了,也还是有心里的表面的伤疤,不好好工作了
因次要平衡好接收速率和发送速率
死信交换机
对于消息处理失败的处理?
是否回到原队列,如果不回到原队列,那么这条消息就丢了
因此有了 死信交换机 处理这种情况
在创建交换机时附带创建一个死信交换机,那么这个队列作废的消息就会被重新发到附带的这个交换机,在去重新路由这条消息
Web STOMP插件
./rabbitmq-plugins enable rabbitmq_web_stomp
浏览器可以使用webSocket和rabbitmq通信
环境准备
在docker跑rabbitmq
自从有了docker,发现都不喜欢直接安装了=-=
docker pull rabbitmq:management
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
rabbitmq 后台
http://ip:15672/
简单例子
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("test");
factory.setUsername("guest");
factory.setPassword("guest");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
简单队列
public class Send {
private final static String QUEUE_NAME = "my_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "nmsl!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
}
public class Recv {
private final static String QUEUE_NAME = "my_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
direct类型
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //交换机类型 direct
// 消息内容
String message = "haha!";
channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes()); //发送到交换机 routingKey 为 delete
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
public class Recv {
private final static String QUEUE_NAME = "test_queue_direct1";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_direct2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv1] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
fanout类型
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息内容
String message = "haha!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); //发送到交换机
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
public class Recv {
private final static String QUEUE_NAME = "test_queue_work1";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_work2";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv1] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
topic类型
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //交换机类型 topic
// 消息内容
String message = "haha!";
channel.basicPublish(EXCHANGE_NAME, "routing.1", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
public class Recv {
private final static String QUEUE_NAME = "test_queue_topic_work_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_topic_work_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv1] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
SpringBoot整合RabbitMQ
创建spring boot项目,引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.properties 配置rabbitmq信息
spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
我们来展示一个topic的例子
需要一个配置类,生成queue,exchange,绑定queue和exchange的关系
package pers.wmx.demo;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
final static String QUEUE_NAME1 = "q_topic_message";
final static String QUEUE_NAME2 = "q_topic_messages";
@Bean
public Queue queue1(){
return new Queue(QUEUE_NAME1);
}
@Bean
public Queue queue2(){
return new Queue(QUEUE_NAME2);
}
@Bean
TopicExchange exchange(){
return new TopicExchange("myexchange");
}
/*
* 绑定Q到交换机,并且指定routingKey
* */
@Bean
Binding bindingExchangeMessage(Queue queue1,TopicExchange exchange){
return BindingBuilder.bind(queue1).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queue2, TopicExchange exchange) {
return BindingBuilder.bind(queue2).to(exchange).with("topic.#");
}
}
发送消息
package pers.wmx.demo;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MsgSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send1(){
String context="1111222";
System.out.println("Sender : " + context);
this.amqpTemplate.convertAndSend("myexchange","topic.message",context);
}
public void send2(){
String context="22222222222222222";
System.out.println("Sender : " + context);
this.amqpTemplate.convertAndSend("myexchange","topic.6666",context);
}
}
2个消费者
package pers.wmx.demo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_topic_message")
public class Receiver1 {
@RabbitHandler
public void process(String xxx){
System.out.println("Receiver1 : " + xxx);
}
}
package pers.wmx.demo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_topic_messages")
public class Receiver2 {
@RabbitHandler
public void process(String xxx){
System.out.println("Receiver2 : " + xxx);
}
}
启动spring boot项目
再跑单元测试
package pers.wmx.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import pers.wmx.demo.simple.Sender;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
@Autowired
private MsgSender msgSender;
@Test
public void send1() throws Exception {
msgSender.send1();
}
@Test
public void send2() throws Exception {
msgSender.send2();
}
}
sender 我们用的AmqpTemplate
其实也可以用RabbitTemplate
下一个列子 direct类型的交换机
@Configuration
public class DirectRabbitConfig {
final static String QUEUE_NAME1 = "directQ1";
final static String QUEUE_NAME2 = "directQ2";
@Bean
public Queue queue1(){
return new Queue(QUEUE_NAME1);
}
//true 标示持久化
@Bean
public Queue queue2(){
return new Queue(QUEUE_NAME2,true);
}
@Bean
DirectExchange exchange(){
return new DirectExchange("direct_exchange");
}
@Bean
Binding bindingExchangeMessage(Queue queue1,DirectExchange exchange){
return BindingBuilder.bind(queue1).to(exchange).with("direct.update");
}
@Bean
Binding bindingExchangeMessages(Queue queue2, DirectExchange exchange) {
return BindingBuilder.bind(queue2).to(exchange).with("direct.delete");
}
}
/**
* @author wmx
* @date 2019-09-25
*/
@Component
public class DirectSender {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(){
rabbitTemplate.convertAndSend("direct_exchange","direct.update","hehe");
}
}
package pers.wmx.demo.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "directQ1")
public class DirectReceiver1 {
@RabbitHandler
public void process(String xxx){
System.out.println("Receiver1 : " + xxx);
}
}
package pers.wmx.demo.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "directQ2")
public class DirectReceiver2 {
@RabbitHandler
public void process(String xxx){
System.out.println("Receiver2 : " + xxx);
}
}
同样启动springboot 项目后,再跑单元测试
@Test
public void directSend() throws Exception {
directSender.send();
}
发送消息和接收消息的类名和之前的不能一样,不然会冲突
在config类中定义的queue也是一样,名字不能一样
Description:
The bean ‘queue1’, defined in class path resource [pers/wmx/demo/topic/TopicRabbitConfig.class], could not be registered. A bean with that name has already been defined in class path resource [pers/wmx/demo/direct/DirectRabbitConfig.class] and overriding is disabled.
和之前的例子重复了,修改下即可
@Configuration
public class DirectRabbitConfig {
final static String QUEUE_NAME1 = "directQ1";
final static String QUEUE_NAME2 = "directQ2";
@Bean
public Queue directQueue1(){
return new Queue(QUEUE_NAME1);
}
//true 标示持久化
@Bean
public Queue directQueue2(){
return new Queue(QUEUE_NAME2,true);
}
@Bean
DirectExchange directExchange(){
return new DirectExchange("direct_exchange");
}
@Bean
Binding bindingExchangeQ1(Queue directQueue1,DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("direct.update");
}
@Bean
Binding bindingExchangeQ2(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("direct.delete");
}
}
最好把所有的队列、交换机、routingKey统一起来在RabbitConfig管理
转载请注明:汪明鑫的个人博客 » 消息队列 RabbitMQ锦囊
说点什么
您将是第一位评论人!