消息队列结果展示
1. 消息中间件模型
1.1 简单消息队列模型
- 代码部分:
文件路径:src/consumer/wx.ts
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';
@Consumer(MSListenerType.RABBITMQ)
export class WxConsumer {
@Inject()
ctx: Context;
/* 简单模式队列、消费者定义 */
@RabbitMQListener('wx.template.message')
async gotData(msg: ConsumeMessage) {
await sleep(500) // 加入半秒的延时,手动确认
this.ctx.logger.info("gotData", JSON.parse(msg.content.toString()).name)
this.ctx.channel.ack(msg); //需要手动确认消息
}
}
1.2 Work工作队列模型
- 功能描述:可以多个消费者一起消费消息,这样效率更高。且如果一个消息处理时间很长,其他线程也可以帮忙消费。
- 代码部分:
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';
@Consumer(MSListenerType.RABBITMQ)
export class WxConsumer {
@Inject()
ctx: Context;
@Consumer(MSListenerType.RABBITMQ)
export class WxConsumer {
@Inject()
ctx: Context;
/* work模式多消费者 - 线程1 */
@RabbitMQListener('wx.template.message')
async gotData(msg: ConsumeMessage) {
await sleep(500)
this.ctx.logger.info("gotData", JSON.parse(msg.content.toString()).name)
this.ctx.channel.ack(msg); //需要手动确认消息
}
/* work模式多消费者 - 线程2 */
@RabbitMQListener('wx.template.message', { prefetch: 5 }) // 预读取5条消息,一次处理5条,而不是处理完上一条才处理下一条
async gotData1(msg: ConsumeMessage) {
await sleep(5000)
this.ctx.logger.info("gotData1", JSON.parse(msg.content.toString()).name)
this.ctx.channel.ack(msg); //需要手动确认消息
}
}
1.3 发布订阅模型
Fanout广播
-
功能简介
- 广播消息,生产者只要发送到此类型的交换机,只要绑定到fanout交换机上的队列,都会收到此消息
应用场景:聊天室、多渠道消息通知、等需要一份消息被多次消费的场景
代码部分:
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';
@Consumer(MSListenerType.RABBITMQ)
export class UserConsumer {
@App()
app: Application;
@Inject()
ctx: Context;
@Inject()
logger;
/* fanout扇出队列定义 */
@RabbitMQListener('abc', {
exchange: 'logs',
exchangeOptions: {
type: 'fanout',
durable: false,
},
durable: false,
exclusive: true,
consumeOptions: {
// noAck: true,
}
})
async gotData(msg: ConsumeMessage) {
await sleep(500)
this.logger.info('fanout1 output1 =>', msg.content.toString('utf8'));
this.ctx.channel.ack(msg)
// TODO
}
@RabbitMQListener('bcd', {
exchange: 'logs',
exchangeOptions: {
type: 'fanout',
durable: false,
},
exclusive: true,
durable: false,
consumeOptions: {
// noAck: true,
}
})
async gotData2(msg: ConsumeMessage) {
await sleep(500)
this.logger.info('fanout2 output2 =>', msg.content.toString('utf8'));
this.ctx.channel.ack(msg)
// TODO
}
}
Direct路由
- 功能描述: Direct路由模式,也可以实现fanout模式的功能,只需要将2个队列的routingKey定义包含为相同的key,就会同事分发到两个队列
- 代码实现:
@Consumer(MSListenerType.RABBITMQ)
export class UserDirectConsumer {
@App()
app: Application;
@Inject()
ctx: Context;
@Inject()
logger;
// direct_logs交换机 :direct_key队列 /* direct模式, 不同的routingKey */
@RabbitMQListener('direct_key', {
exchange: 'direct_logs',
exchangeOptions: {
type: 'direct',
durable: false,
},
routingKey: 'direct_key',
exclusive: true,
durable: false,
consumeOptions: {
// noAck: true,
}
})
async gotData(msg: ConsumeMessage) {
await sleep(500)
this.logger.info('direct_key output1 =>', msg.content.toString('utf8'));
this.ctx.channel.ack(msg)
// TODO
}
//direct_logs交换机 :direct_key队列,同时定义了2个路由key :direct_key1 direct_key
@RabbitMQListener('direct_key', {
exchange: 'direct_logs',
exchangeOptions: {
type: 'direct',
durable: false,
},
routingKey: 'direct_key1',
exclusive: true,
durable: false,
consumeOptions: {
// noAck: true,
}
})
async gotData1(msg: ConsumeMessage) {
await sleep(500)
this.logger.info('direct_key output2 =>', msg.content.toString('utf8'));
this.ctx.channel.ack(msg)
// TODO
}
// direct_logs交换机 :direct_key1队列
@RabbitMQListener('direct_key1', {
exchange: 'direct_logs',
exchangeOptions: {
type: 'direct',
durable: false,
},
routingKey: 'direct_key1',
exclusive: true,
durable: false,
consumeOptions: {
// noAck: true,
}
})
async gotData2(msg: ConsumeMessage) {
await sleep(500)
this.logger.info('direct_key1 output1 =>', msg.content.toString('utf8'));
this.ctx.channel.ack(msg)
// TODO
}
}
Topics话题模式
routingKey定义语法:..msg,.msg., lazy.# (* 代表一个单词,#代表多个单词)
代码实现
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';
@Consumer(MSListenerType.RABBITMQ)
export class UserDirectConsumer {
@App()
app: Application;
@Inject()
ctx: Context;
@Inject()
logger;
/* topic模式, 且定义了死信队列 */
@RabbitMQListener('topic_key', {
exchange: 'topic_logs',
exchangeOptions: {
type: 'topic',
durable: false,
},
deadLetterExchange: "dead_topic",
deadLetterRoutingKey: "dead_topic_key",
routingKey: 'topic_key1',
exclusive: true,
durable: false,
consumeOptions: {
// noAck: true,
}
})
async gotData3(msg: ConsumeMessage) {
await sleep(500)
this.logger.info('topic_key1 output1 =>', msg.content.toString('utf8'));
this.ctx.channel.ack(msg)
// TODO
}
}
死信队列
功能描述:对于定义ttl值的消息,如果消息过期,或者应答重新入队,重试次数大于定义最大次数时,无法消费,会进入死信队列,可用于告警消息通知处理消费
代码实现
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';
@Consumer(MSListenerType.RABBITMQ)
export class UserDirectConsumer {
@App()
app: Application;
@Inject()
ctx: Context;
@Inject()
logger;
/* topic模式, 且定义了死信队列 */
@RabbitMQListener('topic_key', {
exchange: 'topic_logs',
exchangeOptions: {
type: 'topic',
durable: false,
},
deadLetterExchange: "dead_topic", //如果产生死信会进入此交换机
deadLetterRoutingKey: "dead_topic_key", //死信队列进入交换机的RoutingKey
routingKey: 'topic_key1',
exclusive: true,
durable: false,
consumeOptions: {
// noAck: true,
}
})
async gotData3(msg: ConsumeMessage) {
await sleep(500)
this.logger.info('topic_key1 output1 =>', msg.content.toString('utf8'));
this.ctx.channel.ack(msg)
// TODO
}
// /* 死信队列 */
@RabbitMQListener('dead_topic_key', {
exchange: 'dead_topic',
exchangeOptions: {
type: 'direct',
durable: false,
},
routingKey: 'dead_topic_key',
exclusive: true,
durable: false,
consumeOptions: {
// noAck: true,
}
})
async gotData4(msg: ConsumeMessage) {
await sleep(5000)
this.logger.info('dead_topic_key output1 =>', msg.content.toString('utf8'));
this.ctx.channel.ack(msg)
// TODO
}
}
优先级队列
- 功能描述:会优先消费,定义优先级且优先级最高的消息
- 代码实现
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';
@Consumer(MSListenerType.RABBITMQ)
export class UserDirectConsumer {
@App()
app: Application;
@Inject()
ctx: Context;
@Inject()
logger;
/* 优先级队列 */
@RabbitMQListener('priority_message', {
exchange: 'priority',
exchangeOptions: {
type: 'direct',
durable: true,
},
routingKey: 'priority_key',
exclusive: false,
durable: true,
maxPriority: 10,
consumeOptions: {
// noAck: true,
}
})
async gotData6(msg: ConsumeMessage) {
await sleep(1000)
this.logger.info('priority_message output1 =>', msg.content.toString('utf8'));
this.ctx.channel.ack(msg)
// TODO
}
}
延时队列
- 功能描述:会在静默了多少秒之后被消费
- 注意事项:
- 死信队列实现: 延时队列可以通过死信队列实现,但是当队列面的消息ttl时间不同时存在巨大缺陷,因为队列有顺序,会消费掉第一个,才能消费第二个,也就是在消息较多时第二个无论第一个消息过期时间有多快,都需要第一个消息应答之后才能被消费。
- 插件实现:通过延迟插件实现,消息会ttl时间过期的时候才会发给队列消费
- 基于延迟插件代码实现
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';
@Consumer(MSListenerType.RABBITMQ)
export class UserDirectConsumer {
@App()
app: Application;
@Inject()
ctx: Context;
@Inject()
logger;
/* 延迟队列 */
@RabbitMQListener('x_delay_message', {
exchange: 'x_delayed',
exchangeOptions: {
type: 'x-delayed-message',
durable: true,
arguments: { 'x-delayed-type': 'direct' }
},
routingKey: 'x_delayed_key',
exclusive: false,
durable: true,
consumeOptions: {
// noAck: true,
}
})
async gotData5(msg: ConsumeMessage) {
await sleep(5000)
this.logger.info('dead_topic_key output1 =>', msg.content.toString('utf8'));
this.ctx.channel.ack(msg)
// TODO
}
}
- 基于死信队列代码实现:
- 在生产端创建一个指定过期时间的队列,消息不消费到了时间,就会进入死信队列,通过死信队列进行消费,达到延时队列的效果,但是存在巨大缺陷
import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config } from '@midwayjs/core';
import * as amqp from 'amqp-connection-manager'
@Autoload()
@Provide()
@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别)
export class RabbitmqService {
private connection: amqp.AmqpConnectionManager;
private channelWrapper;
@Config("rabbitmq")
rabbitmqConfig;
@Init()
async connect() {
// 创建连接,你可以把配置放在 Config 中,然后注入进来
this.connection = await amqp.connect(this.rabbitmqConfig.url);
// 创建 channel
this.channelWrapper = this.connection.createChannel({
json: true,
setup: function (channel) { //在使用通道之前准备好必要的资源
return Promise.all([
// 绑定队列
channel.assertQueue("wx.template.message", { durable: true }),
channel.assertExchange('logs', 'fanout', { durable: false }),
channel.assertExchange('direct_logs', 'direct', { durable: false }),
// 创建死信队列的延迟队列和交换机
channel.assertExchange("delayExchange", 'direct', { durable: true }),
channel.assertQueue('delayQueue', {
durable: true,
deadLetterExchange: 'dead_topic', // 这里为空,因为不需要死信队列
deadLetterRoutingKey: 'dead_topic_key',
messageTtl: 15000 //创建一个延迟15秒的队列
}),
channel.bindQueue('delayQueue', "delayExchange", "delay")
]);
}
});
}
// 发送消息到队列
public async sendToQueue(queueName: string, data: any) {
return this.channelWrapper.sendToQueue(queueName, data, { persistent: true });
}
// 发送广播消息到交换机
public async sendToExchangeFanoutMessage(ex: string, data: any) {
return this.channelWrapper.publish(ex, '', data, { persistent: true });
}
// 发送普通消息到交换机
public async sendToExchangeMessage(ex: string, routerKey, data: any) {
return this.channelWrapper.publish(ex, routerKey, data, { persistent: true });
}
// 发送优先级消息到交换机
public async sendToExchangePriorityMessage(ex: string, routerKey, data: any) {
return this.channelWrapper.publish(ex, routerKey, data, { persistent: true, priority: 5 });
}
/* 死信延时队列,异步发布确认 */
public async sendToExchangeDelayDirect(ex: string, routerKey, data: any) {
return this.channelWrapper.publish(ex, routerKey, data, { persistent: true }, (err, ok) => {
console.log("err, ok", err, ok);
});
}
/* 延时队列,异步发布确认 */
public async sendToExchangeDelayMessage(ex: string, routerKey, data: any, delay: number) {
return this.channelWrapper.publish(ex, routerKey, data, { persistent: true, headers: { 'x-delay': delay }, }, (err, ok) => {
console.log("err, ok", err, ok);
});
}
@Destroy()
async close() {
await this.channelWrapper.close();
await this.connection.close();
}
}
@Get('/')
async home(): Promise<string> {
for (let i = 0; i < 100; i++) {
await this.mqService.sendToExchangeDelayDirect("delayExchange", "delay", {name: `litao${i}-${i}`})
}
return 'Hello Midwayjs!';
}