一. 概述
参考开源项目https://github.com/xkcoding/spring-boot-demo
本Demo简单介绍springboot继承rocketmq, rocketmq采用docker单机部署
二. 安装rocketmq
由于官方没有提供docker镜像,所以用星星较多的
foxiswho/rocketmq:4.8.0
安装
2.1 拉取镜像
docker pull foxiswho/rocketmq:4.8.0
2.2 安装nameserver
docker run -d \
--name rmqnamesrv \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-p 9876:9876 \
foxiswho/rocketmq:4.8.0 \
sh mqnamesrv
2.3 安装broker
- 新建配置文件:~/conf/broker.conf
注意namesrvAddr
和brokerIP1
填写主机的ip地址,不要写localhost
或127.0.0.1
#所属集群名字
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样,如果在broker-a.properties使用:broker-a,
#在broker-b.properties使用:broker-b
brokerName=broker-a
#0 表示Master,>0 表示Slave
brokerId=0
#nameServer地址,分号分割
#namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
namesrvAddr=xx.xx.xx.xx:9876
#启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=xx.xx.xx.xx
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 !!!这里仔细看是false,false,false
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
#storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
#commitLog 存储路径
#storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
#消费队列存储
#storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
#消息索引存储路径
#storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
#checkpoint 文件存储路径
#storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
#abort 文件存储路径
#abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
运行脚本
docker run -d \
-v ~/conf:/home/rocketmq/conf \
--name rmqbroker \
# 这里填nameserver的宿主机IP 不要写`localhost`或`127.0.0.1`
-e "NAMESRV_ADDR=xx.xx.xx.xx:9876" \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-p 10911:10911 -p 10912:10912 -p 10909:10909 \
foxiswho/rocketmq:4.8.0 \
sh mqbroker -c /home/rocketmq/conf/broker.conf
- 安装控制台
docker run -d \
--name rmqconsole \
# 这里填nameserver的宿主机IP 不要写`localhost`或`127.0.0.1`
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=xx.xx.xx.xx:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 8180:8080 -t styletang/rocketmq-console-ng
- 打开控制台
localhost:8180
三. 搭建spring项目
3.1 依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
3.2 application.yml
server:
port: 8080
servlet:
context-path: /demo
rocketmq:
# 多个用;隔开
name-server: 127.0.0.1:9876
producer:
# 生产组
group: demo-group
# 发送消息超时时间,默认 3000
sendMessageTimeout: 3000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 发送异步消息失败重试次数,默认2
retryTimesWhenSendAsyncFailed: 2
3.3 启动类
@SpringBootApplication
public class SpringBootDemoMqRocketmqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootDemoMqRocketmqApplication.class, args);
}
}
3.4 消息发送
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootDemoMqRocketmqApplicationTests {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void contextLoads() {
rocketMQTemplate.syncSend("topic_wpr" + ":" + "tag_1","hello springboot rocketmq");
}
}
3.5 消息消费
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
* 监听
*
* @author wangpr
* @date 2022/1/5
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "topic_wpr", selectorExpression = "tag_1", consumerGroup = "QueueHandler")
public class QueueListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("消息内容:"+ body);
log.info("消息ID:"+message.getMsgId());
}
}
org.apache.rocketmq.common.message.MessageExt属性说明:
- brokerName:broker名称
- queueId:记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
- storeSize:记录消息在Broker存盘大小
- queueOffset:记录在ConsumeQueue中的偏移
- sysFlag:记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
- bornTimestamp:消息创建时间,在Producer发送消息时设置
- bornHost:记录发送改消息的producer地址
- storeTimestamp:消息存储时间
- storeHost:记录存储该消息的Broker地址
- msgId:消息Id
- commitLogOffest:记录消息在Broker中存储偏移
- bodyCRC:消息内容CRC校验值
- reconsumeTimes:消息重试消费次数
- preparedTransactionOffset:
- message
topic:话题
flag:网络通信层标记
properties
MIN_OFFSET:最小偏移
MAX_OFFSET:最大偏移
CONSUME_START_TIME:消费拉取时间
UNIQ_KEY:
CLUSTER:集群
WAIT:
TAGS:消息标签 - body:Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
- transactionId:事务消息相关的事务编号