当前位置: 首页>后端>正文

rabbitmq生产端midwayjs的定义

1. 生产端服务类定义

import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config } from '@midwayjs/core';
import * as amqp from 'amqp-connection-manager'

@Autoload()
@Provide()
@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别)
export class RabbitmqService {

    private connection: amqp.AmqpConnectionManager;

    private channelWrapper;


    @Config("rabbitmq")
    rabbitmqConfig;


    @Init()
    async connect() {

        // 创建连接,你可以把配置放在 Config 中,然后注入进来
        this.connection = await amqp.connect(this.rabbitmqConfig.url);

        // 创建 channel
        this.channelWrapper = this.connection.createChannel({
            json: true,
            setup: function (channel) {  //在使用通道之前准备好必要的资源
                return Promise.all([
                    // 绑定队列
                    // channel.assertQueue("wx.template.message", { durable: true }),
                    // channel.assertExchange('logs', 'fanout', { durable: false }),
                    // channel.assertExchange('direct_logs', 'direct', { durable: false }),

                    // // 创建延迟队列和交换机
                    // channel.assertExchange("delayExchange", 'direct', { durable: true }),
                    // channel.assertQueue('delayQueue', {
                    //     durable: true, 
                    //     deadLetterExchange: 'dead_topic', // 这里为空,因为不需要死信队列
                    //     deadLetterRoutingKey: 'dead_topic_key',
                    //     messageTtl: 15000 //创建一个延迟15秒的队列
                    // }),
                    // channel.bindQueue('delayQueue', "delayExchange", "delay")


                ]);
            }
        });
    }

    // 发送消息到队列
    public async sendToQueue(queueName: string, data: any) {
        return this.channelWrapper.sendToQueue(queueName, data, { persistent: true });
    }

    // 发送广播消息到交换机
    public async sendToExchangeFanoutMessage(ex: string, data: any) {
        return this.channelWrapper.publish(ex, '', data, { persistent: true });
    }

    // 发送普通消息到交换机
    public async sendToExchangeMessage(ex: string, routerKey, data: any) {
        return this.channelWrapper.publish(ex, routerKey, data, { persistent: true });
    }

    // 发送优先级消息到交换机
    public async sendToExchangePriorityMessage(ex: string, routerKey, data: any) {
        return this.channelWrapper.publish(ex, routerKey, data, { persistent: true, priority: 5 });
    }

    /* 延时队列,异步发布确认 */
    public async sendToExchangeDelayMessage(ex: string, routerKey, data: any, delay: number) {
        return this.channelWrapper.publish(ex, routerKey, data, { persistent: true, headers: { 'x-delay': delay }, }, (err, ok) => {
            console.log("err, ok", err, ok);
        });
    }

    @Destroy()
    async close() {
        await this.channelWrapper.close();
        await this.connection.close();
    }
}

// interface Publish {
//     expiration?: string | number | undefined;
//     userId?: string | undefined;
//     CC?: string | string[] | undefined;

//     mandatory?: boolean | undefined;
//     persistent?: boolean | undefined;
//     deliveryMode?: boolean | number | undefined;
//     BCC?: string | string[] | undefined;

//     contentType?: string | undefined;
//     contentEncoding?: string | undefined;
//     headers?: any;
//     priority?: number | undefined;
//     correlationId?: string | undefined;
//     replyTo?: string | undefined;
//     messageId?: string | undefined;
//     timestamp?: number | undefined;
//     type?: string | undefined;
//     appId?: string | undefined;
// }
  1. 生产端发送消息
import { Controller, Get, Inject } from '@midwayjs/core';
// import { Context } from '@midwayjs/rabbitmq';
import { RabbitmqService } from '../service/rabbitmq';

@Controller('/')
export class HomeController {

  // @Inject()
  // ctx: Context;

  @Inject()
  mqService: RabbitmqService;

  @Get('/')
  async home(): Promise<string> {
    for (let i = 0; i < 100; i++) {
      // 直接发送给队列,不经过交换机
      let result = await this.mqService.sendToQueue("wx.template.message", { name: `litao${i}` }) 

      //发给广播交换机
      let result1 =  await this.mqService.sendToExchangeFanout("logs", {name: "litao"})

      //发给路由交换机,不通的routerkey
      await this.mqService.sendToExchangeDirect("direct_logs", "direct_key", {name: `litao${i}`})
      await this.mqService.sendToExchangeDirect("direct_logs", "direct_key1", {name: `litao${i}-${i}`})
      await this.mqService.sendToExchangeDirect("direct_logs", "direct_key2", {name: `litao${i}-${i}`})

      //发给路由交换机,不通的routerkey
      await this.mqService.sendToDelayQueue("delayQueue", { name: `litao${i}` })  //死信队列
      await this.mqService.sendToExchangeDelayDirect("delayExchange", "delay", {name: `litao${i}-${i}`})  //插件延时队列
      
      //优先级队列
      if (i % 8 === 0) {
        this.mqService.sendToExchangePriorityMessage("priority", 'priority_key', { name: `litao${i}` })
      } else {
        this.mqService.sendToExchangeMessage("priority", "priority_key", { name: `litao${i}`})
      }
      // console.log(`------result${i}------`, result, result1, result2, result3);
    }

    return 'Hello Midwayjs!';
  }
}


https://www.xamrdz.com/backend/3n41939197.html

相关文章: