0.1.1 • Published 7 years ago

akx-mq v0.1.1

Weekly downloads
-
License
ISC
Repository
-
Last release
7 years ago

AkxMQ

================================ A wrapper around '''amqplib''' to handle retries, adding bulk consumers, and connect style middleware.

#Installation

npm install akx-mq --save

Usage


file: akx-mq-config.js

var mqConfig = require( './config.json' );
var akxMq = require( 'akx-mq' )( mqConfig );
var logger = require( './logger' );

akxMq.addErrorHandling( function( err ){
    logger.log( 'mq', err );
} );

exports.akxMqMiddleware =  function(){
    var getMessage = function( req, res ){
        return res._body.wire.dataValues;
    };
    var callback = function( err, req, res, next ){
        if( error ){
            logger.log( 'mq', err, { req: req } )
        }
        return next();
    };
    return akxMq.publishMiddleware( { queue: 'wiresQ', getMessage: getMessage, callback: callback } );
};

exports.akxMqAddConsumer = akxMq.addConsumer;

Then in your routes file call the akx-mq-config.js file and create publisherMiddleware to publish messages to the queues, and add consumers to consume messages for they're respected queues.

file: routes.js

var akxMqConfig = require( './akx-mq-config' );

var users = require('../controllers/users');
server.post('/users', authMiddleware, users.create, akxMqConfig.akxMqMiddleware() );

akxMqConfig.akxMqAddConsumer( {
    queueNameHere: [ fn1, fn2, fn3, ... ],
    anotherQueueName: [ fn1, fn4, ... ]
} );

Note: The addConsumer function uses the keys as the queue names here so make sure that they are in the config file before adding them here. Also the array of functions as the value will be called in order synchronously (connect style middleware). Note that the publish and consumer share the same connection and channel, this is by design. The above example calls akxMqConfig.akxMqMiddleware() function returns back the middleware that will be used. Below are the api references and options.

API Reference


#publishMiddleware

instance.publishMiddleware({[queue, [getMessage, [callback]]]}) Returns a middleware function with the regular req, res, and next arguments.

Takes a POJO with three properties:

  • queue: String name of the queue you want to publish to.
  • getMessage: Function that are given the req, res objects to extract the message.
  • callback: Function that are given the req, res, and next arguments if an error occurs you can log.

example: instance.publishMiddleware({queue:'users', getMessage: getMessageFunc, callback: callbackFunc})

Note: All three are required.

#addConsumer

instance.addConsumer({[queue, [array of functions]]}) Adds consumers to the respected queue.

Takes a POJO. Uses the keys as the queue name and the values as the consumer.

  • queue {queueName:[ fn1, fn2, ...]}

example: instance.addConsume({queueName:[ fn1, fn2, ...]}) Note: that functions will be called in order with the arguments: queueName, parsedMsg, next. The next callback is similar to connect style but you can pass in next( true ) which will raise a flag that will ignore the rest of the functions and call the last function in the list. Else you can just call next() and it will call them in order one at a time. There is no

limit to how many consumers you can add. Also the message wont be acknowledge until the last function has finished.

#addErrorHandling

instance.addErrorHandling([function]) Adds a catch all error handling function

Takes a function to be the general error handling function.

  • function: Just a regular function, named or anonymous.

example: instance.addErrorHandling(function(){ logger.log('message')}) Note: This is optional.

Options


PropertyDataTypeDefaultDescription
retryNumber60000How many milliseconds before retrying to connect to the server
hostString'amqp://localhost'The host for the server
persistentBooleanfalseIf you want the message to persist if server goes down
prefetchNumber0How many unAcked messages you want to allow before sending more down to the consumer
noAckBooleanfalseIf you want no Acknowledge meant
queuesArray An array of objects with 'name', and 'durable' properties