@uwatch/amqp v0.0.20
@uwatch/amqp
Promise-based wrapper for amqplib
Install
$ yarn add @uwatch/amqpExample
import AMQP from '@uwatch/amqp';
const amqp = new AMQP({
connectionString: 'amqp://localhost'
});
(async () => {
try {
// Connect and create channel
await amqp.init();
// Setup needed queues, exchanges...
await amqp.setup(async () => {
// Create queue
await amqp.createQueue('test_queue');
});
// Get queue from instance
const queue = amqp.queue('test_queue');
// Consume message from queue
queue.consume(message => {
console.log(message);
});
} catch (e) {
console.error(e);
}
});API
AMQP class
AMQP constructor. Create an instance to work with amqplib.
Example
const amqp = new AMQP();.connect()
Connect to RabbitMQ server.
Example
await amqp.connect();.close(forceClose = true)
Close connection to RabbitMQ server.
Params
forceClose{boolean}: Force connection close, reconnection disabled.
Example
await amqp.close();.createChannel()
Create channel.
Example
await amqp.createChannel();.setup(func)
Add setup function.
Params
func{function}: Setup function
Example
await amqp.setup(async () => {
console.log('Setup function called!');
});.clearSetups()
Remove all setups.
Example
amqp.clearSetups();.init()
Initializing: creating connection, creating channel, calling all setup functions.
Example
await amqp.init();.createExchange(name, type = 'direct', options = {})
Create exchange.
Params
name{string}: Exchange nametype{string}: Exchange typeoptions{object}: Exchange options
Example
const exchange = await amqp.createExchange('test_exchange');.createQueue(name, options = {})
Create queue.
Params
name{string}: Queue nameoptions{object}: Queue options
Example
const queue = await amqp.createQueue('test_queue');.exchange(name)
Get exchange from instance by name.
Params
name{string}: Exchange name
Example
const exchange = amqp.exchange('test_exchange');.deleteExchange(exchange)
Remove exchange from instance by name or object.
Params
exchange{(string|object)}: Exchange name or object
Example
await amqp.deleteExchange('test_exchange');.queue(name)
Get queue from instance by name.
Params
name{string}: Queue name
Example
const queue = amqp.queue('test_queue');.deleteQueue(queue)
Remove queue from instance by name or object.
Params
queue{(string|object)}: Queue name or object
Example
await amqp.deleteQueue('test_queue');.send(queueName, data = null, options = {})
Send data to queue.
Params
queueName{string}: Queue namedata{any}: Data to sendoptions{object}: Send optionsoptions.delay{number}: Send delay in ms
Example
await amqp.send('test_queue', 'Hello World!');.publish(exchangeName, data = null, routingKey = '', options = {})
Publish data to exchange.
Params
exchangeName{string}: Exchange namedata{any}: Data to sendroutingKey{string}: Routing keyoptions{object}: Publish options
Example
await amqp.publish('test_exchange', 'Hello World!');.reconnect()
Reconnect.
Example
await amqp.reconnect();Queue class
.bind(exchangeName, pattern = '')
Bind queue to exchange.
Params
exchangeName{string}: Exchange namepattern{string}: Bind pattern
Example
await queue.bind('test_exchange');.consume(callback, options = {})
Add consumer function.
Params
callback{function}: Consumer callbackoptions{object}: Consumer options
Example
queue.consume(message => {
console.log(message);
});.send(queueName, data = null, options = {})
Send data to queue.
Params
queueName{string}: Queue namedata{any}: Data to sendoptions{object}: Send options
Example
await queue.send('test_queue', 'Hello World!');.event(queueName, name, data = null, options = {})
Send event-type message to queue.
Params
queueName{string}: Queue namename{string}: Event namedata{any}: Event dataoptions{object}: Send options
Example
await queue.event('test_queue', 'test_event', 'Hello World!');.on(eventName, callback)
Listen event-type message from queue.
Params
eventName{string}: Event namecallback{function}: Event callback
Example
queue.on('test_event', (data, message) => {
console.log(data, message);
});.method(name, callback)
Add RPC method.
Params
name{string}: RPC method namecallback{function}: RPC method callback
Example
queue.method('test_method', (data, message) => {
console.log(data, message);
});.deleteMethod(name)
Delete RPC method.
Params
name{string}: RPC method name
Example
queue.deleteMethod('test_method');.call(queueName, method, data = null, progress, options = {})
Call RPC method.
Params
queueName{string}: Queue namemethod{string}: Method namedata{any}: Data to sendoptions{object}: Send optionsoptions.progress{function}: Progress callback(optional)options.timeout{number}: RPC timeout(defaults to 20000ms)options.doNotWait{boolean}: If true, promise will resolve to undefined and RPC will not wait for method to complete
Example
const result = await queue.call('test_queue', 'test_method', {
message: 'Hello World!'
}, { timeout: 60000, progress: progress => console.log(progress) });Will throw TimeoutError on when timeout reached
On successful call
{
success: true,
result: <data returned from method>
}On failed call
{
success: false,
error: <error message from call>
}Exchange class
.publish(data = null, routingKey = '', options = {})
Publish data to exchange.
Params
data{any}: Data to publishroutingKey{string}: Routing key for RabbitMQoptions{object}: Publish options
Example
await exchange.publish('Hello World!');.event(name, data = null, options = {})
Publish event-type message.
Params
name{string}: Event namedata{any}: Event data
Example
await exchange.event('test_event', 'Hello World!');Message class
.reply(data = null)
Reply to RPC call.
Params
data{any}: Reply data
Example
await message.reply('Reply from RPC call!');.progress(data = null)
Send RPC progress.
Params
data{any}: Progress data
Example
await message.progress('RPC progress 50%!');