目录
前言
初学RocketMQ, 熟悉RocketMQ的老哥可以跳过本文
从0到1简单搭建单机RocketMQ以及简单的demo跑下消息生产和消费
安装
注意安装的rocketMQ版本和客户端版本最好一致,不然有些版本可能导致消息一直发不出去的问题
下载这个版本即可,解压
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip
路径:
~/Documents/software/rocketmq-all-4.2.0/distribution/target/apache-rocketmq » pwd
/Users/xinye/Documents/software/rocketmq-all-4.2.0/distribution/target/apache-rocketmq
启动nameServer:
nohup bin/mqbroker &启动broker:nohup bin/mqbroker -n localhost:9876 &增加一个topic:sh bin/mqadmin updateTopic -n 127.0.0.1:9876 -b 127.0.0.1:10911 -t MyTopic这样单机版的简易rocketMQ就起起来了
到目录启动项目
/Users/xinye/Documents/software/rocketmq-externals-rocketmq-console-1.0.0/rocketmq-consol
mvn spring-boot:run
~/Documents/software/rocketmq-all-4.2.0/distribution/target/apache-rocketmq »sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
Java 收发消息
pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
配置application.yml
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
# Producer 配置项
producer:
group: wmx # 生产者分组
send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
secret-key: # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
# Consumer 配置项
consumer:
listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
test-consumer-group:
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
生产者-同步发消息
package pers.wmx.springbootfreemarkerdemo.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* @author wangmingxin03
* Created on 2021-07-21
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("wmx");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("MyTopic",
"TagTest",
"hello 2021".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
producer.shutdown();
}
}
生产者-异步发消息
package pers.wmx.springbootfreemarkerdemo.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* @author wangmingxin03
* Created on 2021-07-22
*/
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("wmx");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("MyTopic",
"TagTest",
"async".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发消息成功 | sendResult" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("发消息失败, " + e);
}
});
System.out.println("发消息中...");
Thread.sleep(3000);
producer.shutdown();
}
}
消费消息
package pers.wmx.springbootfreemarkerdemo.rocketmq;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
/**
* @author wangmingxin03
* Created on 2021-07-22
*/
public class RocketMqConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wmx");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("MyTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started");
}
}
控制台打印
Consumer StartedConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=1, storeSize=173, queueOffset=0, sysFlag=0, bornTimestamp=1626938540835, bornHost=/172.18.52.73:55759, storeTimestamp=1626938540844, storeHost=/172.18.52.73:10911, msgId=AC12344900002A9F000000000002BEB2, commitLogOffset=179890, bodyCRC=110651398, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=MyTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1626938798481, UNIQ_KEY=AC1234491E5518B4AAC26F71FB230000, WAIT=true, TAGS=TagTest}, body=10]]]
看下之前搭的可视化管理界面
产生了一条消息
消息详情
最后再看下消费者
转载请注明:汪明鑫的个人博客 » RocketMQ简单学习
说点什么
您将是第一位评论人!