1. 生产端服务类定义
import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config } from '@midwayjs/core';
import * as amqp from 'amqp-connection-manager'
@Autoload()
@Provide()
@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别)
export class RabbitmqService {
private connection: amqp.AmqpConnectionManager;
private channelWrapper;
@Config("rabbitmq")
rabbitmqConfig;
@Init()
async connect() {
// 创建连接,你可以把配置放在 Config 中,然后注入进来
this.connection = await amqp.connect(this.rabbitmqConfig.url);
// 创建 channel
this.channelWrapper = this.connection.createChannel({
json: true,
setup: function (channel) { //在使用通道之前准备好必要的资源
return Promise.all([
// 绑定队列
// channel.assertQueue("wx.template.message", { durable: true }),
// channel.assertExchange('logs', 'fanout', { durable: false }),
// channel.assertExchange('direct_logs', 'direct', { durable: false }),
// // 创建延迟队列和交换机
// channel.assertExchange("delayExchange", 'direct', { durable: true }),
// channel.assertQueue('delayQueue', {
// durable: true,
// deadLetterExchange: 'dead_topic', // 这里为空,因为不需要死信队列
// deadLetterRoutingKey: 'dead_topic_key',
// messageTtl: 15000 //创建一个延迟15秒的队列
// }),
// channel.bindQueue('delayQueue', "delayExchange", "delay")
]);
}
});
}
// 发送消息到队列
public async sendToQueue(queueName: string, data: any) {
return this.channelWrapper.sendToQueue(queueName, data, { persistent: true });
}
// 发送广播消息到交换机
public async sendToExchangeFanoutMessage(ex: string, data: any) {
return this.channelWrapper.publish(ex, '', data, { persistent: true });
}
// 发送普通消息到交换机
public async sendToExchangeMessage(ex: string, routerKey, data: any) {
return this.channelWrapper.publish(ex, routerKey, data, { persistent: true });
}
// 发送优先级消息到交换机
public async sendToExchangePriorityMessage(ex: string, routerKey, data: any) {
return this.channelWrapper.publish(ex, routerKey, data, { persistent: true, priority: 5 });
}
/* 延时队列,异步发布确认 */
public async sendToExchangeDelayMessage(ex: string, routerKey, data: any, delay: number) {
return this.channelWrapper.publish(ex, routerKey, data, { persistent: true, headers: { 'x-delay': delay }, }, (err, ok) => {
console.log("err, ok", err, ok);
});
}
@Destroy()
async close() {
await this.channelWrapper.close();
await this.connection.close();
}
}
// interface Publish {
// expiration?: string | number | undefined;
// userId?: string | undefined;
// CC?: string | string[] | undefined;
// mandatory?: boolean | undefined;
// persistent?: boolean | undefined;
// deliveryMode?: boolean | number | undefined;
// BCC?: string | string[] | undefined;
// contentType?: string | undefined;
// contentEncoding?: string | undefined;
// headers?: any;
// priority?: number | undefined;
// correlationId?: string | undefined;
// replyTo?: string | undefined;
// messageId?: string | undefined;
// timestamp?: number | undefined;
// type?: string | undefined;
// appId?: string | undefined;
// }
- 生产端发送消息
import { Controller, Get, Inject } from '@midwayjs/core';
// import { Context } from '@midwayjs/rabbitmq';
import { RabbitmqService } from '../service/rabbitmq';
@Controller('/')
export class HomeController {
// @Inject()
// ctx: Context;
@Inject()
mqService: RabbitmqService;
@Get('/')
async home(): Promise<string> {
for (let i = 0; i < 100; i++) {
// 直接发送给队列,不经过交换机
let result = await this.mqService.sendToQueue("wx.template.message", { name: `litao${i}` })
//发给广播交换机
let result1 = await this.mqService.sendToExchangeFanout("logs", {name: "litao"})
//发给路由交换机,不通的routerkey
await this.mqService.sendToExchangeDirect("direct_logs", "direct_key", {name: `litao${i}`})
await this.mqService.sendToExchangeDirect("direct_logs", "direct_key1", {name: `litao${i}-${i}`})
await this.mqService.sendToExchangeDirect("direct_logs", "direct_key2", {name: `litao${i}-${i}`})
//发给路由交换机,不通的routerkey
await this.mqService.sendToDelayQueue("delayQueue", { name: `litao${i}` }) //死信队列
await this.mqService.sendToExchangeDelayDirect("delayExchange", "delay", {name: `litao${i}-${i}`}) //插件延时队列
//优先级队列
if (i % 8 === 0) {
this.mqService.sendToExchangePriorityMessage("priority", 'priority_key', { name: `litao${i}` })
} else {
this.mqService.sendToExchangeMessage("priority", "priority_key", { name: `litao${i}`})
}
// console.log(`------result${i}------`, result, result1, result2, result3);
}
return 'Hello Midwayjs!';
}
}