- 注解类配置
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/***
* Rocketmq配置文件
* @author xionghu
* @date 2022/3/6 11:56
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.rocketmq")
@ConditionalOnProperty(prefix = "spring.rocketmq", value = {"accessKey","secretKey","nameSrvAddr","instanceId","endpoint"})
public class RocketMQConfig {
private String accessKey;
private String secretKey;
private String nameSrvAddr;
private String maxReconsumeTimes;
private String instanceId;
private String endpoint;
public Properties getMqProperties() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
// properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, this.maxReconsumeTimes);
return properties;
}
}
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.yqsl.common.service.rocketmq.annotation.RocketMQMessageListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@RequiredArgsConstructor
@ConditionalOnBean(RocketMQConfig.class)
@AutoConfigureAfter(RocketMQConfig.class)
@Configuration
@Slf4j
public class RocketMQConsumerConfiguration implements ApplicationContextAware {
/**
* mq 配置
*/
private final RocketMQConfig config;
private ApplicationContext applicationContext;
@PostConstruct
public void initConsumers() throws Exception {
Map<String, MessageListener> map = applicationContext.getBeansOfType(MessageListener.class);
//一次性全部注入
for (MessageListener listener : map.values()) {
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = config.getMqProperties();
//订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
RocketMQMessageListener annotation = listener.getClass().getDeclaredAnnotation(RocketMQMessageListener.class);
// String topic = annotation.messageClass().getDeclaredAnnotation(RocketMQMessage.class).topic();
// String tag = annotation.messageClass().getDeclaredAnnotation(RocketMQMessage.class).tag();
// String groupId = annotation.groupId();
String topic = annotation.topic();
String tag = annotation.tag();
String groupId = annotation.groupId();
String messageModel = annotation.messageModel();
log.info("properties={}", JSON.toJSONString(properties));
// properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
properties.setProperty(PropertyKeyConst.MessageModel, messageModel);
consumerBean.setProperties(properties);
Subscription subscription = new Subscription();
subscription.setTopic(topic);
subscription.setExpression(tag);
subscriptionTable.put(subscription, listener);
consumerBean.setSubscriptionTable(subscriptionTable);
consumerBean.start();
// properties.put(PropertyKeyConst.GROUP_ID, groupId);
// properties.put(PropertyKeyConst.MessageModel, messageModel);
// Consumer mqConsumer = ONSFactory.createConsumer(properties);
// mqConsumer.subscribe(topic, tag, listener);
// mqConsumer.start();
/**
* 释放资源
*/
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
consumerBean.shutdown();
log.info("RocketMQConsumerConfiguration releaseResource ...");
} catch (Exception e) {
log.error("RocketMQConsumerConfiguration error: {}", e);
}
}
});
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.TransactionProducerBean;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@RequiredArgsConstructor
@Configuration
@ConditionalOnBean(RocketMQConfig.class)
@AutoConfigureAfter(RocketMQConfig.class)
public class RocketMQProducerConfiguration {
private final RocketMQConfig config;
@ConditionalOnClass(ProducerBean.class)
@ConditionalOnMissingBean(ProducerBean.class)
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean producer() {
ProducerBean producer = new ProducerBean();
producer.setProperties(config.getMqProperties());
return producer;
}
@ConditionalOnClass(TransactionProducerBean.class)
@ConditionalOnMissingBean(TransactionProducerBean.class)
@Bean(initMethod = "start", destroyMethod = "shutdown")
public TransactionProducerBean transactionProducer() {
TransactionProducerBean producer = new TransactionProducerBean();
producer.setProperties(config.getMqProperties());
return producer;
}
}
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.TransactionProducerBean;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.shade.com.google.common.base.Preconditions;
import com.yqsl.common.service.rocketmq.annotation.MessageKey;
import com.yqsl.common.service.rocketmq.annotation.RocketMQMessage;
import com.yqsl.common.service.rocketmq.configuration.RocketMQConfig;
import com.yqsl.common.service.rocketmq.configuration.RocketMQProducerConfiguration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
@RequiredArgsConstructor
@Configuration
@Slf4j
@ConditionalOnBean(value ={ RocketMQProducerConfiguration.class,ProducerBean.class})
@AutoConfigureAfter(RocketMQProducerConfiguration.class)
public class MQProducer {
private final ProducerBean producerBean;
private final TransactionProducerBean transactionProducerBean;
/**
* 发送同步消息
*
* @param object
* @return
*/
public boolean send(Object object) {
Message message = buildMessage(object);
return send(message);
}
// /**
// * 发送同步消息
// *
// * @param object
// * @param mqTopic
// * @param mqTag
// * @return
// */
// public boolean send(Object object, MQTopic mqTopic, MQTag mqTag) {
// return send(object, mqTopic, mqTag, 0);
// }
// /**
// * 发送同步消息
// *
// * @param object 消息主体
// * @param mqTopic 订阅主题
// * @param mqTag 订阅标签
// * @param times 重试次数,默认为0,重试最大次数
// * @return
// */
// public boolean send(Object object, MQTopic mqTopic, MQTag mqTag, int times) {
// Message message = buildMessage(object, mqTopic, mqTag);
// if (times != 0) {
// message.setReconsumeTimes(times);
// }
// return send(message);
// }
/**
* 发送延时消息
*
* @param object
* @param delayTime
* @return
*/
public boolean sendDelay(Object object, long delayTime) {
Message message = buildMessage(object);
long startDeliverTime = System.currentTimeMillis() + delayTime;
message.setStartDeliverTime(startDeliverTime);
return send(message);
}
// /**
// * 发送延时消息
// *
// * @param object
// * @param delayTime
// * @return
// */
// public boolean sendDelay(Object object, MQTopic mqTopic, MQTag mqTag, long delayTime) {
// Message message = buildMessage(object, mqTopic, mqTag);
// long startDeliverTime = System.currentTimeMillis() + delayTime;
// message.setStartDeliverTime(startDeliverTime);
// return send(message);
// }
private boolean send(Message message) {
SendResult sendResult = producerBean.send(message);
log.info("SendResult:{}", sendResult);
log.info("Message:{}", message);
return sendResult != null && StrUtil.isNotEmpty(sendResult.getMessageId());
}
public void sendWithTransaction(Object object, LocalTransactionExecuter localTransactionExecuter) {
Message message = buildMessage(object);
SendResult send = transactionProducerBean.send(message, localTransactionExecuter, null);
}
// private Message buildMessage(Object object, MQTopic mqTopic, MQTag mqTag) {
// Message message = new Message();
// message.setTopic(mqTopic.getValue());
// message.setTag(mqTag.getValue());
// message.setKey(getMessageKey(object, mqTag.getValue()));
// message.setBody(object instanceof String (String.valueOf(object)).getBytes() : JSON.toJSONString(object).getBytes());
// return message;
// }
private Message buildMessage(Object object, String mqTopic, String mqTag) {
Message message = new Message();
message.setTopic(mqTopic);
message.setTag(mqTag);
message.setKey(getMessageKey(object, mqTag));
message.setBody(object instanceof String (String.valueOf(object)).getBytes() : JSON.toJSONString(object).getBytes());
return message;
}
private Message buildMessage(Object object) {
RocketMQMessage rocketMessage = object.getClass().getDeclaredAnnotation(RocketMQMessage.class);
Preconditions.checkArgument(rocketMessage != null, "缺少注解 @RocketMQMessage");
Preconditions.checkArgument(rocketMessage.topic() != null, "注解 @RocketMQMessage 缺少 Topic 属性");
Preconditions.checkArgument(rocketMessage.tag() != null, "注解 @RocketMQMessage 缺少 tag 属性");
Message message = buildMessage(object, rocketMessage.topic(), rocketMessage.tag());
return message;
}
private String getMessageKey(Object object, String tag) {
String key = "";
Field[] declaredFields = object.getClass().getDeclaredFields();
for (Field declaredField : declaredFields) {
if (declaredField.isAnnotationPresent(MessageKey.class)) {
Object id = ReflectUtil.getFieldValue(object, declaredField);
if (ObjectUtil.isNotNull(id)) {
key = tag + ":" + id;
}
break;
}
}
if (StrUtil.isBlank(key)) {
key = StrUtil.toString(System.currentTimeMillis());
}
return key;
}
}
- Yaml 配置
rocketmq:
accessKey: 你的阿里云accessKeyaccessKey
secretKey: 你的阿里云accessKeyaccessKey
nameSrvAddr: 你的nameSrvAddr
maxReconsumeTimes: 15
instanceId: 你的阿里云实例id
endpoint: 你的阿里云端点
- 发送实例类和topic、tag绑定
@RocketMQMessage(topic = "user",tag="tag-demo")
@Data
public class User {
String name;
int age;
}
- 消息消费类和 topic、groupId、tag绑定
@RocketMQMessageListener(topic = "user", groupId = "GID_user_test", tag = "tag-demo")
@RequiredArgsConstructor
@Component
@Slf4j
public class Tag1Consumer implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
String msg = new String(message.getBody());
User user = JSONObject.parseObject(msg, User.class);
//TODO 业务
log.info("Tag1Consumer user={}", JSONObject.toJSONString(user));
return Action.CommitMessage;
}
}
- 调用方式
@Resource
private MQProducer mqProducer;
@Test
void pushFreezeText() {
User user = new User();
user.setName("Jack");
user.setAge(12);
mqProducer.send(user);
}
- 测试
结果