2.1.3 • Published 3 years ago
message-broker-lib v2.1.3
message-broker-lib
Connection management for amqplib. This is a wrapper around amqplib
and amqp-connection-manager
which provides automatic reconnects.
Responses are always objects returning error
or data
Features
- Uses
amqp-connection-manager
for connection management - Easy Installation and Use
- Simple Exposed Functions
- Clear Error Response
Installation
"dependencies": {
"message-broker-lib": "^1.0.4"
}
npm install message-broker-lib
Initialization
const broker = require('message-broker-lib').RabbitMQ;
//when initializing RabbitMQ, pass in the url
//When no url is passed, RABBITMQ_CLUSTER_URL and/or RABBITMQ_URL from process.env is used
//When using cluster, URL should be passed as "amqp://localhost-1,amqp://localhost-2,amqp://localhost-3"
const connection = await broker.init({
url: "amqp://localhost",
heartbeat: 60 //in seconds
})
Basic functionality
- Create A Channel
const {error, data} = await rabbitMQ.createChannel((channel) => {
return Promise.all([
//all queue and exchange assertions and creation that needs tobe done, can be done here.
channel.assertQueue(channelName, {durable: true})
])
});
- Create A Queue
//(async/await)
const {error, data} = await broker.createQueue(channelName, {
durable: true //options: checkout https://www.rabbitmq.com for more options
});
- To queue a data for processing
//(async/await)
const payload = {
timestamp: Date.now(),
name: "A Name",
email: "Email"
};
const {error, data} = await broker.queue(channelName, payload, {persistent: true});
-- To assert/create an exchange
const exchangeName = "logs";
const {error, data} = await broker.assertExchange(exchangeName,"fanout", {durable: true}); //exchange types includes fanout, direct, topic and header.checkout https://www.rabbitmq.com for more exchange types.
console.log({error, data});
-- To Publish to an exchange
const exchangeName = "logs";
const {error, data} = await broker.publish(exchangeName,'',{
timestamp: Date.now(),
name: "A Name",
email: "Email"
});
-- To create/assert A queue
const exchangeName = "logs";
const queueName = "test-exchange-queue";
const queueOption = {exclusive: true, autoDelete: true};//if you want temporary queue
const bindKey = ""; //read more on routing here https://rabbitmq.com/tutorials/tutorial-four-javascript.html
const {error, data} = await broker.assertQueue(exchangeName, queueName, queueOption, bindKey);
console.log({queue});
// queue: { queue: 'test-exchange-queue', messageCount: 0, consumerCount: 0 }
-- To listen to a queue and pull data for processing
//(async/await)
const prefetch = 1;
rabbitMQ.listen(channelName,{
noAck: false // listen options:checkout https://www.rabbitmq.com for more options
}, (error, raw, channel) => {
const stringPayload = raw.content.toString();
const objectPayload = JSON.parse(stringPayload);
//....process payload .../
channel.ack(payload); //acknowledge that processing has been done and remove from queue
}, prefetch);
//Warning: If you enable acknowledgement {noAck: false}, the next data on the queue
//won't be released by the queue until the current data is acknowledge.
-- To close a connection
broker.close();
Tests
Cli
npm install
npm test
Contributors
2.1.3
3 years ago
2.1.2
3 years ago
2.0.3
3 years ago
2.1.1
3 years ago
2.0.2
3 years ago
2.1.0
3 years ago
2.0.1
3 years ago
2.0.0
3 years ago
1.0.2
3 years ago
1.0.1
3 years ago
1.0.0
3 years ago
1.0.4
3 years ago
1.0.3
3 years ago
0.0.6
4 years ago
0.0.5
4 years ago
0.0.4
4 years ago
0.0.3
4 years ago
0.0.2
4 years ago
0.0.1
4 years ago