1.2.2 • Published 5 years ago
egg-mqc v1.2.2
egg-mqc
Install
$ npm i egg-mqc --save
Usage
// {app_root}/config/plugin.js
exports.mqc = {
enable: true,
package: 'egg-mqc',
};
Configuration
// {app_root}/config/config.default.js
exports.amqp = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'Thomas',
password: 'Thomas.A.Edison',
vhost: '/',
// http://www.squaremobius.net/amqp.node/ssl.html
// opts: {
// cert: certificateAsBuffer, client cert
// key: privateKeyAsBuffer, client key
// passphrase: 'MySecretPassword', passphrase for key
// ca: [caCertAsBuffer], array of trusted CA certs
// },
pool: {
max: 10, // maximum size of the pool
min: 2, // minimum size of the pool
acquireTimeoutMillis?: number,
},
};
see config/config.default.js for more detail.
Example
- app/extends/application.ts
/**
* publish data to topic
* @param this
* @param topicKey
* @param data
*/
mqPublish(this: Application, topicKey: string, data: Buffer) {
const exchange = 'amq.topic';
return this.amqp()
.then(conn => conn.createChannel())
.then(channel => {
const result = channel.publish(exchange, topicKey, data, { persistent: true });
channel.close();
return result;
})
.catch(() => false);
}
- publish message to topic
const rst = await this.app.mqPublish('this_is_your_topic', Buffer.from('hello there'));
- consume in app.ts
async serverDidReady() {
// Server is listening.
subscribe(this.app, "#", "fund.queue", (channel: Channel, msg: ConsumeMessage | null) => {
if (msg) {
let text = msg.content.toString();
let { exchange, routingKey } = msg.fields;
this.app.logger.info(` - [${exchange}, ${routingKey}] ${text}`);
channel.ack(msg);
}
});
}
define AmqpConsumer in file app/lib/amqpconsumer
import { Connection, ConsumeMessage, Channel } from 'amqplib';
import { Application } from 'egg';
export class AmqpConsumer {
private readonly tag: string;
private readonly app: Application;
private handler: (conn: Connection, tag: string) => {};
constructor(app: Application, handler, tag?: string) {
this.tag = tag && `[${tag}]` || '';
this.app = app;
this.handler = handler;
this.process();
}
public process() {
this.app.logger.info(`${this.tag} start consume...`);
this.app.amqp().then(conn => {
conn.on('close', () => {
this.app.logger.info(`${this.tag} consume is closed!!!`);
this.process();
});
return conn;
}).then(conn => {
this.app.logger.info(`${this.tag} consuming`);
this.handler(conn, this.tag);
}).catch(() => {
this.app.logger.info(`${this.tag} error!`);
this.process();
});
}
}
/**
* 订阅消息
* 使用示例
* subscribe(this.app, "#", "fund.queue", (channel: Channel, msg: ConsumeMessage | null) => {
if (msg) {
let text = msg.content.toString();
let { exchange, routingKey } = msg.fields;
this.app.logger.info(`[${exchange}, ${routingKey}] ${text}`);
channel.ack(msg);
}
});
* @param app
* @param topic 主题名字
* @param queue 消息队列名字
* @param handler 订阅处理,处理完消息务必调用channel.ack(msg)
*/
export function subscribe(app: Application, topic: string, queue: string, handler: (channel: Channel, msg: ConsumeMessage | null) => void) {
new AmqpConsumer(app, (conn) => {
const exchange = 'amq.topic';
conn.createChannel().then((channel => {
// 消费时,绑定队列到exchange
channel.assertExchange(exchange, 'topic', { durable: true });
channel.assertQueue(queue, { durable: true });
channel.bindQueue(queue, exchange, topic);
channel.consume(queue, (msg) => {
try {
handler(channel, msg);
} catch (err) {
app.logger.error(`consume ${queue} error ${err}`);
}
});
}))
}, topic);
}
Questions & Suggestions
Please open an issue here.