1.2.0 • Published 5 years ago
amqp-broker-lib v1.2.0
AMQP-Broker
Node.js broker lib for AMQP using amqplib.
Installation
Via npm:
npm install amqp-broker-libVia yarn:
yarn add amqp-broker-libUsage
import { Broker, Config, logger } from "amqp-broker-lib";
// Consumer function
function plusOne(msg) {
return parseInt(msg.content.toString()) + 1;
}
const config: Config = {
// connection options
connection: {
protocol: "amqp",
name: "rabbitmq",
host: "localhost",
port: "5672"
},
// Exchanges
exchanges: [
{
name: "exchange",
type: "direct",
options: {}
}
],
// queues
queues: [
{
name: "plusOne",
exchange: "exchange",
key: "exchange.plusOne",
options: {}
}
]
};
// Instantiate the broker service
const broker = new Broker(config);
// add Consumer to queue
broker.addConsume("plusOne", plusOne);
broker.init()
.then(() => broker.publishMessage(
{
msg: "1",
exchange: "exchange",
key: "exchange.plusOne",
rpc: true,
options: {}
}
))
.then((response) => {
console.log(response);
})Broker Service
Broker
Initialize the broker instance.
const broker = new Broker(configs);Configs
connection: Connection data. Contain the following datas:user: User namepass: Passwordhost: RabbitMQ hostport: RabbitMQ portprotocol: amqp or amqpscertificate: Certificate datatimeout: numbername: service name
exchanges: A list of exchanges data. Each data has the following attributes:name: name of exchangetype: exchange type. options: direct, topic or fanout.options: exchange options. See in amqplib docs.
queues: A list of queue data. Each data has the following attributes:name: name of queueexchange(Optional): name of exchange that queue will bind.key(Optional): pattern key to queueoptions: Queue options. See in amqplib docs.
Broker.addConsume(queue, callback)
Add a consumer to broker
broker.addConsume('queue-name', consumeFunc);Params:
queue: Queue name.callback: consumer function. This function will receive aConsumeMessagelike this:{ content: Buffer, fields: Object, properties: Object }
Broker.init()
Initialize the Broker service. This method returns a Promise instance.
broker.init();Broker.publishMessage(publishOptions)
publish a message to a exchange key pattern. Returns a Promise
let publishOptions = {
exchange: "exchange", // Exchange name
key: "exchange.send-to-queue", // Key pattern
msg: "This is a message", // Message to consumer. Can be a string or Object
options: {
replyTo: "q"
}
}
await broker.publishMessage(publishOptions);
// If have "rpc" option, wait for a response.
publishOptions.rpc = true;
const response = await broker.publishMessage(publishOptions);Broker.sendMessage(sendOptions)
Send a message directaly to a queue. Returns a Promise.
let sendOptions = {
queue: "queue", // queue name
msg: "This is a message", // Message to consumer. Can be a string or Object
options: {
replyTo: "q"
}
}
await broker.publishMessage(sendOptions);
// If have "rpc" option, wait for a response.
sendOptions.rpc = true;
const response = await broker.publishMessage(sendOptions);Broker.close()
Close the channel connection.
Testing
npm test