Welcome everyone

消息队列 RabbitMQ锦囊

消息队列 汪明鑫 32浏览 0评论

消息协议

AMQP

MQTT

STOMP

XMPP

 

JMS

接口规范,与平台无关的API

是对AMQP、MQTT、STOMP、XMPP更高一层抽象

类似JDBC

 

点对点模型

发布/订阅模型

 

ActiveMQ完全支持JMS

 

如果大家对消息协议和JMS感兴趣,更多详情可以翻阅相关书籍和百度,这两块我也是跳着看的–

 

认识RabbitMQ

RabbitMQ 是Erlang开发的 AMQP协议 的消息队列,是目前主流的消息队列产品之一

在《分布式消息中间件实践》一书中,介绍了RabbitMQ、ActiveMQ、Kafka、RocketMQ,打算把这本书看完,目前只看了RabbitMQ部分

据我了解ActiveMQ的使用应该少了,但毕竟也是Apache的,目前比较活跃的是RabbitMQ、RocketMQ

Kafka主要是用于大数据相关

Kafka的性能要远远高于其他三,支持集群,基于发布订阅,零拷贝技术,批处理,高性能io,缓存等

当时在网易,新闻推送就是用的2层kafka

RabbitMQ有非常灵活的路由协议,客户端也支持很多种语言

饿了么是根据RabbitMQ使用 go开发的类似RabbitMQ  MaxQ,毕竟Erlang太小众

京东也开发的JMQ

阿里用的RocketMQ

 

主流MQ对比

 

本篇我们主要近距离了解RabbitMQ

如果有一些概念性的东西看不大懂,或者由于我的水平垃圾写的不清晰

请耐心的接着往下看,会有例子等着你,可以把这些例子都敲一边,

我还是觉得这篇文章可以帮你一天入门RabbitMQ,毕竟是我整合了很多的网上的文章和书中的内容,再加上我自己的实操和理解

如有错误,敬请指正

 

名词解释

Broker: 接收消息和发送消息的 mq服务器实例。

Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中
的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,
每个用户在自己的vhost创建exchange/queue等。避免命名冲突。

可以把Virtual host理解成mini版rabbitMQ

Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker
不会断开连接,除非出现网络故障或broker服务出现问题。

Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开
销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接(虚拟的),如果应用程序支持多线程,通
常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker
识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立
TCP connection的开销。

Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue
中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。

Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中(消息复制)。

Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到
exchange中的查询表中,用于message的分发依据

 

架构

 

 

交换器类型

 

主要关注下面三种就行

 

direct

这种类型的交换机的路由规则是根据一个routingKey的标识,交换机通过一个routingKey与队列绑定 ,在生
产者生产消息的时候 指定一个routingKey 当绑定的队列的routingKey 与生产者发送的一样 那么交换机会吧
这个消息发送给对应的队列。

可以理解成点对点模型

 

 

fanout

理解成消息广播即可,一对多,只要绑定了交换机的队列就都可以收到消息

 

注意:fanout不涉及routingKey

 

topic

主题订阅

符合某一主题的queue都接收消息

通常使用到匹配符

 

持久化

Queue持久化

Message持久化

Exchange持久化

 

消息确认

如何保证消息从生产者发送到Broker,消息没有丢失,Broker没有宕机?

【生产者 –> Broker】

 

方式:

1,事务机制(不建议)

2,确认模式(异步)

 

确认模式:

把信道设置为确认模式时,在该信道发布的消息都会被分配一个唯一ID,当消息被成功发送到匹配的队列时,

信道就会向生产者发送确认消息,该消息包含了刚才说的唯一ID,这时生产者对比该ID,就知道消息已经成功发送到队列

 

确认模式细分:

1,普通确认

2,批量确认

3,异步确认

 

消费者应答

如何确认消费者收到了服务端发送的消息?消息没有丢失?

【Broker –> 消费者】

 

消费者向Broker的应答机制,消费者回执(Consumer Acknowledgement)

两种ACK方式:

手动

自动

autoAck

 

拒绝消息

消费者当前也有拒绝接收消息的权利

 

拒绝一条消息

拒绝多条消息

 

可以设置拒绝后的消息Broker如何处置:丢弃 or 重新放入队列

有一个蛋疼的是如果你只有一个消费者时,你还选择决绝消息后重新放回队列,那么这条消息又来了,又拒绝,又回队列,又来了。。。

这条消息就会在同一个消费者上发生死循环,要避免这种情况

 

com.rabbit.client.Channel中的2个重载的basicReject方法

 

消息预取

如果有100个消息需要处理,有2个消费者A、B,rabbitmq会给A、B各50条消息,

由于消费者消费消息的速度不同,最终消息处理完的时间自然是以消费速度慢的为准,这样就会影响整体的性能

如果能够让消费速度更快的消费更多消息,不就舒服了吗,就像负载均衡策略那样,不采用简单的轮询

这是由rabbitmq消息发放机制决定的,有的消费者忙,有的消费者闲,而系统整体的性能往往很受短板的影响

如何解决?

采用消息预取

可以设置预取数量 prefetchCount

如果prefetchCount = 1,每次消费者收到一条消息后再给服务端ACK时,服务端在收到ACK之前都不会再发送消息

消息一条一条处理

注意:使用消息预取的前提是设置为手动确认消息

 

如果设置为1可以极大的利用客户端的性能,我消费完了就可以赶紧消费下一条不会导致忙的很忙闲的很闲,

但是,我们每消费一条消息 就要通知一次rabbitmq 然后再取出新的消息,

这样对于rabbitmq的性能来讲是不合理的 所以这个参数要根据业务情况设置

 

一般要考虑个平衡值,prefetchCount不能太小也不能太大

可以通过com.rabbit.client.Channel中的basicQos方法设置预取数量

 

流控机制

目的:保证稳定性

当下游消费速度跟不上游生产速度造成消息积压时,,触发流控机制,就会阻塞消息生产者的连接,阻止生产者继续发送消息

直到资源不足的警告解除,就算解除了,rabbitMQ的接收消息速率和发送消息速率都会减小,性能收到很大的影响,恶化。

就好比你把人打了一顿,鼻青脸肿,就算第二天好的差不多了,也还是有心里的表面的伤疤,不好好工作了

因次要平衡好接收速率和发送速率

 

死信交换机

对于消息处理失败的处理?

是否回到原队列,如果不回到原队列,那么这条消息就丢了

因此有了 死信交换机 处理这种情况

在创建交换机时附带创建一个死信交换机,那么这个队列作废的消息就会被重新发到附带的这个交换机,在去重新路由这条消息

 

Web STOMP插件

./rabbitmq-plugins enable rabbitmq_web_stomp

浏览器可以使用webSocket和rabbitmq通信

 

环境准备

在docker跑rabbitmq

自从有了docker,发现都不喜欢直接安装了=-=

 

docker pull rabbitmq:management

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
15672:控制台端口号
5672:应用访问端口号

 

rabbitmq 后台

http://ip:15672/

 

 

简单例子

<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.4.1</version>
    </dependency>
public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("test");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }


}

 

简单队列

public class Send {

    private final static String QUEUE_NAME = "my_test_01";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();

        // 从连接中创建通道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 消息内容
        String message = "nmsl!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();
    }

}

 

public class Recv {

    private final static String QUEUE_NAME = "my_test_01";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }


}

 

direct类型

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");  //交换机类型 direct

        // 消息内容
        String message = "haha!";
        channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());   //发送到交换机  routingKey 为 delete
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

}
public class Recv {
    private final static String QUEUE_NAME = "test_queue_direct1";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}
public class Recv1 {
    private final static String QUEUE_NAME = "test_queue_direct2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv1] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}

 

 

fanout类型

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息内容
        String message = "haha!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());   //发送到交换机
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

}
public class Recv {
    private final static String QUEUE_NAME = "test_queue_work1";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}
public class Recv1 {
    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv1] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}

 

 

topic类型

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");  //交换机类型 topic

        // 消息内容
        String message = "haha!";
        channel.basicPublish(EXCHANGE_NAME, "routing.1", null, message.getBytes());   
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

}
public class Recv {
    private final static String QUEUE_NAME = "test_queue_topic_work_1";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}

 

public class Recv1 {
    private final static String QUEUE_NAME = "test_queue_topic_work_2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv1] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}

 

SpringBoot整合RabbitMQ

创建spring boot项目,引入依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 

application.properties 配置rabbitmq信息

spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

 

我们来展示一个topic的例子

需要一个配置类,生成queue,exchange,绑定queue和exchange的关系

package pers.wmx.demo;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitConfig {

    final static String QUEUE_NAME1 = "q_topic_message";
    final static String QUEUE_NAME2 = "q_topic_messages";

    @Bean
    public Queue queue1(){
        return new Queue(QUEUE_NAME1);
    }

    @Bean
    public Queue queue2(){
        return new Queue(QUEUE_NAME2);
    }

    @Bean
    TopicExchange exchange(){
        return new TopicExchange("myexchange");
    }

    /*
    * 绑定Q到交换机,并且指定routingKey
    * */
    @Bean
    Binding bindingExchangeMessage(Queue queue1,TopicExchange exchange){
        return BindingBuilder.bind(queue1).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queue2, TopicExchange exchange) {
        return BindingBuilder.bind(queue2).to(exchange).with("topic.#");
    }
}

 

发送消息

package pers.wmx.demo;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MsgSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send1(){
        String context="1111222";
        System.out.println("Sender : " + context);

        this.amqpTemplate.convertAndSend("myexchange","topic.message",context);
    }

    public void send2(){
        String context="22222222222222222";
        System.out.println("Sender : " + context);

        this.amqpTemplate.convertAndSend("myexchange","topic.6666",context);
    }

}

 

2个消费者

package pers.wmx.demo;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_topic_message")
public class Receiver1 {

    @RabbitHandler
    public void process(String xxx){
        System.out.println("Receiver1  : " + xxx);
    }

}

 

package pers.wmx.demo;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_topic_messages")
public class Receiver2 {

    @RabbitHandler
    public void process(String xxx){
        System.out.println("Receiver2  : " + xxx);
    }

}

 

启动spring boot项目

再跑单元测试

package pers.wmx.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import pers.wmx.demo.simple.Sender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {

    @Autowired
    private MsgSender msgSender;

    @Test
    public void send1() throws Exception {
        msgSender.send1();
    }

    @Test
    public void send2() throws Exception {
        msgSender.send2();
    }
    
}

 

sender 我们用的AmqpTemplate

其实也可以用RabbitTemplate

 

下一个列子 direct类型的交换机

@Configuration
public class DirectRabbitConfig {

    final static String QUEUE_NAME1 = "directQ1";
    final static String QUEUE_NAME2 = "directQ2";

    @Bean
    public Queue queue1(){
        return new Queue(QUEUE_NAME1);
    }

    //true  标示持久化
    @Bean
    public Queue queue2(){
        return new Queue(QUEUE_NAME2,true);
    }

    @Bean
    DirectExchange exchange(){
        return new DirectExchange("direct_exchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queue1,DirectExchange exchange){
        return BindingBuilder.bind(queue1).to(exchange).with("direct.update");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queue2, DirectExchange exchange) {
        return BindingBuilder.bind(queue2).to(exchange).with("direct.delete");
    }
}
/**
 * @author wmx
 * @date 2019-09-25
 */
@Component
public class DirectSender {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void send(){
        rabbitTemplate.convertAndSend("direct_exchange","direct.update","hehe");

    }
}
package pers.wmx.demo.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "directQ1")
public class DirectReceiver1 {

    @RabbitHandler
    public void process(String xxx){
        System.out.println("Receiver1  : " + xxx);
    }

}
package pers.wmx.demo.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "directQ2")
public class DirectReceiver2 {

    @RabbitHandler
    public void process(String xxx){
        System.out.println("Receiver2  : " + xxx);
    }

}

 

同样启动springboot 项目后,再跑单元测试

@Test
    public void directSend() throws Exception {
        directSender.send();
    }

 

发送消息和接收消息的类名和之前的不能一样,不然会冲突

在config类中定义的queue也是一样,名字不能一样

Description:

The bean ‘queue1’, defined in class path resource [pers/wmx/demo/topic/TopicRabbitConfig.class], could not be registered. A bean with that name has already been defined in class path resource [pers/wmx/demo/direct/DirectRabbitConfig.class] and overriding is disabled.

 

和之前的例子重复了,修改下即可

@Configuration
public class DirectRabbitConfig {

    final static String QUEUE_NAME1 = "directQ1";
    final static String QUEUE_NAME2 = "directQ2";

    @Bean
    public Queue directQueue1(){
        return new Queue(QUEUE_NAME1);
    }

    //true  标示持久化
    @Bean
    public Queue directQueue2(){
        return new Queue(QUEUE_NAME2,true);
    }

    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("direct_exchange");
    }

    @Bean
    Binding bindingExchangeQ1(Queue directQueue1,DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("direct.update");
    }

    @Bean
    Binding bindingExchangeQ2(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with("direct.delete");
    }
}

 

最好把所有的队列、交换机、routingKey统一起来在RabbitConfig管理

转载请注明:汪明鑫的个人博客 » 消息队列 RabbitMQ锦囊

喜欢 (0)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz