Welcome everyone

RocketMQ简单学习

消息队列 汪明鑫 68浏览 0评论

前言

初学RocketMQ, 熟悉RocketMQ的老哥可以跳过本文

从0到1简单搭建单机RocketMQ以及简单的demo跑下消息生产和消费

安装

注意安装的rocketMQ版本和客户端版本最好一致,不然有些版本可能导致消息一直发不出去的问题

 

下载这个版本即可,解压

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip

 

 

路径:

~/Documents/software/rocketmq-all-4.2.0/distribution/target/apache-rocketmq » pwd

/Users/xinye/Documents/software/rocketmq-all-4.2.0/distribution/target/apache-rocketmq

 

启动nameServer:

nohup bin/mqbroker &
启动broker:
nohup bin/mqbroker -n localhost:9876 &
增加一个topic:
sh bin/mqadmin updateTopic -n 127.0.0.1:9876 -b 127.0.0.1:10911 -t MyTopic
这样单机版的简易rocketMQ就起起来了
可视化
https://github.com/apache/rocketmq-externals/releases/tag/rocketmq-console-1.0.0
下载zip文件解压

到目录启动项目

/Users/xinye/Documents/software/rocketmq-externals-rocketmq-console-1.0.0/rocketmq-consol

 mvn spring-boot:run

访问localhost:8080
跑工具生产消息

~/Documents/software/rocketmq-all-4.2.0/distribution/target/apache-rocketmq »sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

 

 

 

Java 收发消息

pom依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.2.0</version>
        </dependency>

 

配置application.yml

# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
  name-server: 127.0.0.1:9876 # RocketMQ Namesrv
  # Producer 配置项
  producer:
    group: wmx # 生产者分组
    send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
    access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
    secret-key: # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
  # Consumer 配置项
  consumer:
    listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
      test-consumer-group:
        topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费

 

生产者-同步发消息

package pers.wmx.springbootfreemarkerdemo.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * @author wangmingxin03
 * Created on 2021-07-21
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("wmx");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        Message msg = new Message("MyTopic",
                "TagTest",
                "hello 2021".getBytes(RemotingHelper.DEFAULT_CHARSET)
        );
        SendResult sendResult = producer.send(msg);
        System.out.println(sendResult);

        producer.shutdown();
    }
}

 

生产者-异步发消息

package pers.wmx.springbootfreemarkerdemo.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * @author wangmingxin03
 * Created on 2021-07-22
 */
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("wmx");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        Message msg = new Message("MyTopic",
                "TagTest",
                "async".getBytes(RemotingHelper.DEFAULT_CHARSET)
        );

        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发消息成功 | sendResult" + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("发消息失败, " + e);
            }
        });

        System.out.println("发消息中...");
        Thread.sleep(3000);
        producer.shutdown();
    }
}

 

消费消息

package pers.wmx.springbootfreemarkerdemo.rocketmq;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * @author wangmingxin03
 * Created on 2021-07-22
 */
public class RocketMqConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wmx");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("MyTopic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                            ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started");
    }
}

 

控制台打印

Consumer StartedConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=1, storeSize=173, queueOffset=0, sysFlag=0, bornTimestamp=1626938540835, bornHost=/172.18.52.73:55759, storeTimestamp=1626938540844, storeHost=/172.18.52.73:10911, msgId=AC12344900002A9F000000000002BEB2, commitLogOffset=179890, bodyCRC=110651398, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=MyTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1626938798481, UNIQ_KEY=AC1234491E5518B4AAC26F71FB230000, WAIT=true, TAGS=TagTest}, body=10]]]

 

 

看下之前搭的可视化管理界面

产生了一条消息

 

 

消息详情

 

最后再看下消费者

 

 

 

转载请注明:汪明鑫的个人博客 » RocketMQ简单学习

喜欢 (1)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz