当前位置: 首页>后端>正文

SpringBoot整合rocketmq

一. 概述

参考开源项目https://github.com/xkcoding/spring-boot-demo
本Demo简单介绍springboot继承rocketmq, rocketmq采用docker单机部署

二. 安装rocketmq

由于官方没有提供docker镜像,所以用星星较多的foxiswho/rocketmq:4.8.0安装

2.1 拉取镜像

docker pull foxiswho/rocketmq:4.8.0

2.2 安装nameserver

docker run -d \
--name rmqnamesrv \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-p 9876:9876 \
foxiswho/rocketmq:4.8.0 \
sh mqnamesrv

2.3 安装broker

  1. 新建配置文件:~/conf/broker.conf
    注意namesrvAddrbrokerIP1填写主机的ip地址,不要写localhost127.0.0.1
#所属集群名字
brokerClusterName=DefaultCluster

#broker名字,注意此处不同的配置文件填写的不一样,如果在broker-a.properties使用:broker-a,
#在broker-b.properties使用:broker-b
brokerName=broker-a

#0 表示Master,>0 表示Slave
brokerId=0

#nameServer地址,分号分割
#namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
namesrvAddr=xx.xx.xx.xx:9876

#启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=xx.xx.xx.xx

#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 !!!这里仔细看是false,false,false
autoCreateTopicEnable=true

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口
listenPort=10911

#删除文件时间点,默认凌晨4点
deleteWhen=04

#文件保留时间,默认48小时
fileReservedTime=120

#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
#storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
#commitLog 存储路径
#storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
#消费队列存储
#storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
#消息索引存储路径
#storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
#checkpoint 文件存储路径
#storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
#abort 文件存储路径
#abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
#限制的消息大小
maxMessageSize=65536

#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER

#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

运行脚本

docker run -d \
-v ~/conf:/home/rocketmq/conf \
--name rmqbroker \
# 这里填nameserver的宿主机IP 不要写`localhost`或`127.0.0.1`
-e "NAMESRV_ADDR=xx.xx.xx.xx:9876" \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-p 10911:10911 -p 10912:10912 -p 10909:10909 \
foxiswho/rocketmq:4.8.0 \
sh mqbroker -c /home/rocketmq/conf/broker.conf
  1. 安装控制台
docker run -d \
--name rmqconsole \
# 这里填nameserver的宿主机IP 不要写`localhost`或`127.0.0.1`
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=xx.xx.xx.xx:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 8180:8080 -t styletang/rocketmq-console-ng
  1. 打开控制台
localhost:8180
SpringBoot整合rocketmq,第1张

三. 搭建spring项目

3.1 依赖

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-spring-boot-starter</artifactId>
          <version>${rocketmq-spring-boot-starter-version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <optional>true</optional>
        </dependency>
    </dependencies>

3.2 application.yml

server:
  port: 8080
  servlet:
    context-path: /demo
rocketmq:
  # 多个用;隔开
  name-server: 127.0.0.1:9876
  producer:
    # 生产组
    group: demo-group
    # 发送消息超时时间,默认 3000
    sendMessageTimeout: 3000
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 发送异步消息失败重试次数,默认2
    retryTimesWhenSendAsyncFailed: 2

3.3 启动类

@SpringBootApplication
public class SpringBootDemoMqRocketmqApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootDemoMqRocketmqApplication.class, args);
    }

}

3.4 消息发送

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootDemoMqRocketmqApplicationTests {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void contextLoads() {
        rocketMQTemplate.syncSend("topic_wpr" + ":" + "tag_1","hello springboot rocketmq");
    }

}

3.5 消息消费

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

/**
 * 监听
 *
 * @author wangpr
 * @date 2022/1/5
 */
@Slf4j
@Service
@RocketMQMessageListener(topic = "topic_wpr", selectorExpression = "tag_1", consumerGroup = "QueueHandler")
public class QueueListener implements RocketMQListener<MessageExt>  {

    @Override
    public void onMessage(MessageExt message) {
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("消息内容:"+ body);
        log.info("消息ID:"+message.getMsgId());
    }
}

org.apache.rocketmq.common.message.MessageExt属性说明:

  • brokerName:broker名称
  • queueId:记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
  • storeSize:记录消息在Broker存盘大小
  • queueOffset:记录在ConsumeQueue中的偏移
  • sysFlag:记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
  • bornTimestamp:消息创建时间,在Producer发送消息时设置
  • bornHost:记录发送改消息的producer地址
  • storeTimestamp:消息存储时间
  • storeHost:记录存储该消息的Broker地址
  • msgId:消息Id
  • commitLogOffest:记录消息在Broker中存储偏移
  • bodyCRC:消息内容CRC校验值
  • reconsumeTimes:消息重试消费次数
  • preparedTransactionOffset:
  • message
      topic:话题
      flag:网络通信层标记
      properties
        MIN_OFFSET:最小偏移
        MAX_OFFSET:最大偏移
        CONSUME_START_TIME:消费拉取时间
        UNIQ_KEY:
        CLUSTER:集群
        WAIT:
        TAGS:消息标签
  • body:Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
  • transactionId:事务消息相关的事务编号
SpringBoot整合rocketmq,第2张

https://www.xamrdz.com/backend/3mt1940035.html

相关文章: