当前位置: 首页>后端>正文

rabbitmq消息中间件的原理和消费端midwayjs的定义

消息队列结果展示
rabbitmq消息中间件的原理和消费端midwayjs的定义,第1张

rabbitmq消息中间件的原理和消费端midwayjs的定义,第2张

1. 消息中间件模型

1.1 简单消息队列模型

  • 代码部分:
    文件路径: src/consumer/wx.ts
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';

@Consumer(MSListenerType.RABBITMQ)
export class WxConsumer {

    @Inject()
    ctx: Context;

    /* 简单模式队列、消费者定义 */
    @RabbitMQListener('wx.template.message')
    async gotData(msg: ConsumeMessage) {
        await sleep(500) // 加入半秒的延时,手动确认
        this.ctx.logger.info("gotData", JSON.parse(msg.content.toString()).name)
        this.ctx.channel.ack(msg);  //需要手动确认消息
    }
}

1.2 Work工作队列模型

  • 功能描述:可以多个消费者一起消费消息,这样效率更高。且如果一个消息处理时间很长,其他线程也可以帮忙消费。
  • 代码部分:
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';

@Consumer(MSListenerType.RABBITMQ)
export class WxConsumer {

    @Inject()
    ctx: Context;

@Consumer(MSListenerType.RABBITMQ)
export class WxConsumer {

    @Inject()
    ctx: Context;

    /* work模式多消费者 - 线程1 */
    @RabbitMQListener('wx.template.message')
    async gotData(msg: ConsumeMessage) {
        await sleep(500)
        this.ctx.logger.info("gotData", JSON.parse(msg.content.toString()).name)
        this.ctx.channel.ack(msg);  //需要手动确认消息
    }
    
    /* work模式多消费者 - 线程2 */
    @RabbitMQListener('wx.template.message', { prefetch: 5 }) // 预读取5条消息,一次处理5条,而不是处理完上一条才处理下一条
    async gotData1(msg: ConsumeMessage) {
        await sleep(5000)
        this.ctx.logger.info("gotData1", JSON.parse(msg.content.toString()).name)
        this.ctx.channel.ack(msg);  //需要手动确认消息
    }

}

1.3 发布订阅模型

Fanout广播

  • 功能简介

    • 广播消息,生产者只要发送到此类型的交换机,只要绑定到fanout交换机上的队列,都会收到此消息
  • 应用场景:聊天室、多渠道消息通知、等需要一份消息被多次消费的场景

  • 代码部分:

import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';

@Consumer(MSListenerType.RABBITMQ)
export class UserConsumer {

    @App()
    app: Application;

    @Inject()
    ctx: Context;

    @Inject()
    logger;

    /* fanout扇出队列定义 */
    @RabbitMQListener('abc', {
        exchange: 'logs',
        exchangeOptions: {
            type: 'fanout',
            durable: false,
        },
        durable: false,
        exclusive: true,
        consumeOptions: {
            //   noAck: true,
        }
    })
    async gotData(msg: ConsumeMessage) {
        await sleep(500)
        this.logger.info('fanout1 output1 =>', msg.content.toString('utf8'));
        this.ctx.channel.ack(msg)
        // TODO
    }

    @RabbitMQListener('bcd', {
        exchange: 'logs',
        exchangeOptions: {
            type: 'fanout',
            durable: false,
        },
        exclusive: true,
        durable: false,
        consumeOptions: {
            //   noAck: true,

        }
    })
    async gotData2(msg: ConsumeMessage) {
        await sleep(500)
        this.logger.info('fanout2 output2 =>', msg.content.toString('utf8'));
        this.ctx.channel.ack(msg)
        // TODO
    }

}

Direct路由

  • 功能描述: Direct路由模式,也可以实现fanout模式的功能,只需要将2个队列的routingKey定义包含为相同的key,就会同事分发到两个队列
  • 代码实现:
@Consumer(MSListenerType.RABBITMQ)
export class UserDirectConsumer {

    @App()
    app: Application;

    @Inject()
    ctx: Context;

    @Inject()
    logger;
    // direct_logs交换机 :direct_key队列 /* direct模式, 不同的routingKey */
    @RabbitMQListener('direct_key', {
        exchange: 'direct_logs',
        exchangeOptions: {
            type: 'direct',
            durable: false,
        },
        routingKey: 'direct_key',
        exclusive: true,
        durable: false,
        consumeOptions: {
            //   noAck: true,
        }
    })
    async gotData(msg: ConsumeMessage) {
        await sleep(500)
        this.logger.info('direct_key output1 =>', msg.content.toString('utf8'));
        this.ctx.channel.ack(msg)
        // TODO
    }

    //direct_logs交换机 :direct_key队列,同时定义了2个路由key :direct_key1 direct_key
    @RabbitMQListener('direct_key', {
        exchange: 'direct_logs',
        exchangeOptions: {
            type: 'direct',
            durable: false,
        },
        routingKey: 'direct_key1',  
        exclusive: true,
        durable: false,
        consumeOptions: {
            //   noAck: true,
        }
    })
    async gotData1(msg: ConsumeMessage) {
        await sleep(500)
        this.logger.info('direct_key output2 =>', msg.content.toString('utf8'));
        this.ctx.channel.ack(msg)
        // TODO
    }

    // direct_logs交换机 :direct_key1队列
    @RabbitMQListener('direct_key1', {
        exchange: 'direct_logs',
        exchangeOptions: {
            type: 'direct',
            durable: false,
        },
        routingKey: 'direct_key1',
        exclusive: true,
        durable: false,
        consumeOptions: {
            //   noAck: true,
        }
    })
    async gotData2(msg: ConsumeMessage) {
        await sleep(500)
        this.logger.info('direct_key1 output1 =>', msg.content.toString('utf8'));
        this.ctx.channel.ack(msg)
        // TODO
    }
}

Topics话题模式

  • routingKey定义语法:..msg,.msg., lazy.# (* 代表一个单词,#代表多个单词)

  • 代码实现

import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';

@Consumer(MSListenerType.RABBITMQ)
export class UserDirectConsumer {

    @App()
    app: Application;

    @Inject()
    ctx: Context;

    @Inject()
    logger;

    /* topic模式, 且定义了死信队列 */
    @RabbitMQListener('topic_key', {
        exchange: 'topic_logs',
        exchangeOptions: {
            type: 'topic',
            durable: false,
        },
        deadLetterExchange: "dead_topic", 
        deadLetterRoutingKey: "dead_topic_key",
        routingKey: 'topic_key1',
        exclusive: true,
        durable: false,
        consumeOptions: {
            //   noAck: true,
        }
    })
    async gotData3(msg: ConsumeMessage) {
        await sleep(500)
        this.logger.info('topic_key1 output1 =>', msg.content.toString('utf8'));
        this.ctx.channel.ack(msg)
        // TODO
    }
}

死信队列

  • 功能描述:对于定义ttl值的消息,如果消息过期,或者应答重新入队,重试次数大于定义最大次数时,无法消费,会进入死信队列,可用于告警消息通知处理消费

  • 代码实现

import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';

@Consumer(MSListenerType.RABBITMQ)
export class UserDirectConsumer {

    @App()
    app: Application;

    @Inject()
    ctx: Context;

    @Inject()
    logger;

    /* topic模式, 且定义了死信队列 */
    @RabbitMQListener('topic_key', {
        exchange: 'topic_logs',
        exchangeOptions: {
            type: 'topic',
            durable: false,
        },
        deadLetterExchange: "dead_topic", //如果产生死信会进入此交换机
        deadLetterRoutingKey: "dead_topic_key", //死信队列进入交换机的RoutingKey
        routingKey: 'topic_key1',
        exclusive: true,
        durable: false,
        consumeOptions: {
            //   noAck: true,
        }
    })
    async gotData3(msg: ConsumeMessage) {
        await sleep(500)
        this.logger.info('topic_key1 output1 =>', msg.content.toString('utf8'));
        this.ctx.channel.ack(msg)
        // TODO
    }


    // /* 死信队列 */
    @RabbitMQListener('dead_topic_key', {
        exchange: 'dead_topic',
        exchangeOptions: {
            type: 'direct',
            durable: false,
        },
        routingKey: 'dead_topic_key',
        exclusive: true,
        durable: false,
        consumeOptions: {
            //   noAck: true,
        }
    })
    async gotData4(msg: ConsumeMessage) {
        await sleep(5000)
        this.logger.info('dead_topic_key output1 =>', msg.content.toString('utf8'));
        this.ctx.channel.ack(msg)
        // TODO
    }
}

优先级队列

  • 功能描述:会优先消费,定义优先级且优先级最高的消息
  • 代码实现
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';

@Consumer(MSListenerType.RABBITMQ)
export class UserDirectConsumer {

    @App()
    app: Application;

    @Inject()
    ctx: Context;

    @Inject()
    logger;

    /* 优先级队列 */
    @RabbitMQListener('priority_message', {
        exchange: 'priority',
        exchangeOptions: {
            type: 'direct',
            durable: true,
        },
        routingKey: 'priority_key',
        exclusive: false,
        durable: true,
        maxPriority: 10,
        consumeOptions: {
            //   noAck: true,
        }
    })
    async gotData6(msg: ConsumeMessage) {
        await sleep(1000)
        this.logger.info('priority_message output1 =>', msg.content.toString('utf8'));
        this.ctx.channel.ack(msg)
        // TODO
    }
}

延时队列

  • 功能描述:会在静默了多少秒之后被消费
  • 注意事项:
      1. 死信队列实现: 延时队列可以通过死信队列实现,但是当队列面的消息ttl时间不同时存在巨大缺陷,因为队列有顺序,会消费掉第一个,才能消费第二个,也就是在消息较多时第二个无论第一个消息过期时间有多快,都需要第一个消息应答之后才能被消费。
      1. 插件实现:通过延迟插件实现,消息会ttl时间过期的时候才会发给队列消费
  • 基于延迟插件代码实现
import { Consumer, MSListenerType, RabbitMQListener, Inject, App, sleep } from '@midwayjs/core';
import { Application, Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';

@Consumer(MSListenerType.RABBITMQ)
export class UserDirectConsumer {

    @App()
    app: Application;

    @Inject()
    ctx: Context;

    @Inject()
    logger;

    /* 延迟队列 */
    @RabbitMQListener('x_delay_message', {
        exchange: 'x_delayed',
        exchangeOptions: {
            type: 'x-delayed-message',
            durable: true,
            arguments: { 'x-delayed-type': 'direct' }
        },
        routingKey: 'x_delayed_key',
        exclusive: false,
        durable: true,
        consumeOptions: {
            //   noAck: true,
        }
    })
    async gotData5(msg: ConsumeMessage) {
        await sleep(5000)
        this.logger.info('dead_topic_key output1 =>', msg.content.toString('utf8'));
        this.ctx.channel.ack(msg)
        // TODO
    }
}
  • 基于死信队列代码实现:
  1. 在生产端创建一个指定过期时间的队列,消息不消费到了时间,就会进入死信队列,通过死信队列进行消费,达到延时队列的效果,但是存在巨大缺陷
import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config } from '@midwayjs/core';
import * as amqp from 'amqp-connection-manager'

@Autoload()
@Provide()
@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别)
export class RabbitmqService {

    private connection: amqp.AmqpConnectionManager;

    private channelWrapper;


    @Config("rabbitmq")
    rabbitmqConfig;


    @Init()
    async connect() {

        // 创建连接,你可以把配置放在 Config 中,然后注入进来
        this.connection = await amqp.connect(this.rabbitmqConfig.url);

        // 创建 channel
        this.channelWrapper = this.connection.createChannel({
            json: true,
            setup: function (channel) {  //在使用通道之前准备好必要的资源
                return Promise.all([
                    // 绑定队列
                    channel.assertQueue("wx.template.message", { durable: true }),
                    channel.assertExchange('logs', 'fanout', { durable: false }),
                    channel.assertExchange('direct_logs', 'direct', { durable: false }),

                    // 创建死信队列的延迟队列和交换机
                    channel.assertExchange("delayExchange", 'direct', { durable: true }),
                    channel.assertQueue('delayQueue', {
                        durable: true,
                        deadLetterExchange: 'dead_topic', // 这里为空,因为不需要死信队列
                        deadLetterRoutingKey: 'dead_topic_key',
                        messageTtl: 15000 //创建一个延迟15秒的队列
                    }),
                    channel.bindQueue('delayQueue', "delayExchange", "delay")


                ]);
            }
        });
    }

    // 发送消息到队列
    public async sendToQueue(queueName: string, data: any) {
        return this.channelWrapper.sendToQueue(queueName, data, { persistent: true });
    }

    // 发送广播消息到交换机
    public async sendToExchangeFanoutMessage(ex: string, data: any) {
        return this.channelWrapper.publish(ex, '', data, { persistent: true });
    }

    // 发送普通消息到交换机
    public async sendToExchangeMessage(ex: string, routerKey, data: any) {
        return this.channelWrapper.publish(ex, routerKey, data, { persistent: true });
    }

    // 发送优先级消息到交换机
    public async sendToExchangePriorityMessage(ex: string, routerKey, data: any) {
        return this.channelWrapper.publish(ex, routerKey, data, { persistent: true, priority: 5 });
    }

    /* 死信延时队列,异步发布确认 */
    public async sendToExchangeDelayDirect(ex: string, routerKey, data: any) {
        return this.channelWrapper.publish(ex, routerKey, data, { persistent: true }, (err, ok) => {
            console.log("err, ok", err, ok);
        });
    }

    /* 延时队列,异步发布确认 */
    public async sendToExchangeDelayMessage(ex: string, routerKey, data: any, delay: number) {
        return this.channelWrapper.publish(ex, routerKey, data, { persistent: true, headers: { 'x-delay': delay }, }, (err, ok) => {
            console.log("err, ok", err, ok);
        });
    }

    @Destroy()
    async close() {
        await this.channelWrapper.close();
        await this.connection.close();
    }
}

  @Get('/')
  async home(): Promise<string> {
    for (let i = 0; i < 100; i++) {
      await this.mqService.sendToExchangeDelayDirect("delayExchange", "delay", {name: `litao${i}-${i}`})
    }

    return 'Hello Midwayjs!';
  }

https://www.xamrdz.com/backend/3nk1934978.html

相关文章: