1.1.1 • Published 8 years ago

rabbit-hapi v1.1.1

Weekly downloads
3
License
Apache-2.0
Repository
github
Last release
8 years ago

rabbit-hapi

HAPI plugin to wrap basic RabbitMQ operations based on amqp library for NodeJS.

This plugin enables these RabbitMQ operations : - pub/sub - send to queue or exchange - consume messages from queue or exchange - send a request in RPC mode

For further informations about RabbitMQ possibilities, see RabbitMQ official documentation.

Installation

npm install --save rabbit-hapi

Usage

server.register({
    plugin  : require('rabbit-hapi'),
    options : {}
});

Options

### Global settings

Global settings use for broker connection.

ParamsDescriptionDefault
hostnameRabbitMQ broker hostnamelocalhost
portPort for AMQP5672
credentialsCredential to connect to RabbitMQ broker-
heartbeatPeriod of the connection heartbeat (seconds)30
maxRetryMax retry allowed on connection or channel error5

RabbitMQ settings

Default settings applied on queues/exchanges.

ParamsDescriptionDefault
typeExchange typedirect
optionsQueue options-
options.durableDurable options for queuetrue
options.noAckDisable acknowledgement on messages if truefalse
options.RPCExpireExpiration time (milliseconds) for answer queue in RPC mode60000
options.allUpToallUpTo on ack/noAckfalse
options.requeueRequeue on noAcktrue
options.debugDebug configuration
options.debug.isActivatedActivate debugfalse
options.debug.expiresQueue expires after x ms86400000 // 24 hours
options.debug.durableQueue is durabletrue
options.debug.persistentMessage requeue on error are persistenttrue

Messages settings

Default messages settings.

ParamsDescriptionDefault
contentMessage content-
optionsMessage options-
options.contentTypeContent MIME typeapplication/json
options.persistentIf true, message will survive a broker restarttrue

Debug mode

When debug is activated, if consumer fails during message treatment, message will be also queue in a specific queue. Queue can be named in options.debug.queue. If not, name will be the original queue's name plus debug.

Server Methods

publish

Publish a message through a fanout exchange.

Parameters

FieldTypeDescription
messageObject/StringMessage to publish (if string, automatically assign to message.content)
message.content*Message content
message.optionsObjectMessage options (same as those provided by amqp lib)
exchangeStringExchange name
optionsObjectExchange options
routingKeyString(optional) Routing key to use
queueString(optional) Queue name

Usage

rabbitHapi.publish({
    exchange    : 'publishExchange',
    options     : {
        durable     : false
    },
    message     : 'Hello World !'
}).then(() => {
    // do some stuff here...
}).catch(() => {
    // something went wrong...
});

subscribe

Subscribe to a fanout exchange.

Parameters

FieldTypeDescription
exchangeStringExchange name
optionsObjectExchange options
queueString(optional) Queue name
waitingFuncFunctionFunction to call when starting consuming
receiveFuncFunctionFunction to call on message reception

Usage

rabbitHapi.subscribe({
    exchange    : 'publishExchange',
    options     : {
        durable     : false,
        noAck       : false,
        exclusive   : true
    },
    receiveFunc : (message) => {
        console.log('Receive', message.content.toString());
    },
    waitingFunc : () => {
        console.log('Waiting for message');
    }
});

send

Send a message to an exchange or a queue. If both routing key and queue are specified, bind the exchange to the queue using the routing key. In order to use queue generated from amqp (amq.gen...), you need to specified the parameter generatedQueue.

Parameters

FieldTypeDescription
messageObject/StringMessage to publish (if string, automatically assign to message.content)
message.content*Message content
message.optionsObjectMessage options (same as those provided by amqp lib)
exchangeStringExchange name
typeStringExchange type
optionsObjectExchange options
routingKeyString(optional) Routing key to use
queueString(optional) Queue name
generatedQueueBoolean(optional) True to use queue generated by the broker

Usage

rabbitHapi.send({
    queue       : 'hello',
    options     : {
        durable : false,
        noAck   : false
    },
    message     : 'Hello World !'
}).then(() => {
    // do some stuff....
}).catch(() => {
    // something went wrong...
});

consume

Consume messages from an exchange or a queue. Automatic reconnection to a new channel on connection error/lost.

Parameters

FieldTypeDescription
exchangeStringExchange name
typeStringExchange type
optionsObjectExchange options
prefetchNumberSpecify prefetch on the channel
queueString(optional) Queue name
waitingFuncFunctionFunction to call when starting consuming
receiveFuncFunctionFunction to call on message reception

Usage

rabbitHapi.consume({
    queue       : 'hello',
    options     : {
        durable : false,
        noAck   : false
    },
    receiveFunc : (message) => {
        console.log('Direct message receive ', message.content.toString());
    },
    waitingFunc : () => {
        console.log('Waiting for message');
    }
});

bindExchange

Bind a key (or an array of keys) to exchange/queue. Create the exchange and/or the queue if it doesn't exist.

Parameters

FieldTypeDescription
exchangeStringExchange name
typeStringExchange type
optionsObjectExchange options
queueString(optional) Queue name
routingKeysString/String[]Routing keys to bind to

Usage

rabbitHapi.bindExchange({
    exchange    : 'logExchange',
    type        : 'direct',
    queue       : 'logQueue',
    routingKeys : [ 'error', 'info', 'debug' ],
    options     : {
        durable     : true,
        noAck       : false
    }
});

sendRPC

Send a message acting like a client RPC.

Parameters

FieldTypeDescription
messageObject/StringMessage to publish (if string, automatically assign to message.content)
message.content*Message content
message.optionsObjectMessage options (same as those provided by amqp lib)
queueStringQueue to send request on
optionsObjectExchange options
RPCTimeoutNumberTimeout if no answer is received from the RPC server (default to 30 sec)
receiveFuncFunctionFunction to call on server response

Usage

rabbitHapi.sendRPC({
    queue       : 'rpc_queue',
    options     : {
        durable : false,
        noAck   : false
    },
    message     : 'Hello RPC server !',
    receiveFunc : (answer) => {
        console.log('Server answer with', answer.content.toString());
        // do some stuff...
    }
});

answerToRPC

Answer to an RPC request.

Note : answer queue is specified and not exclusive in order to enable retry on connection loss.

Parameters

FieldTypeDescription
prefetchNumberSpecify prefetch on the channel
queueStringQueue to send the response on
optionsObjectExchange options
waitingFuncFunctionFunction to call when starting consuming
receiveFuncFunctionFunction to call on message reception (return value of the function will be send to client RPC)

Usage

rabbitHapi.answerToRPC({
    queue       : 'rpc_queue',
    options     : {
        durable     : false,
        noAck       : false
    },
    receiveFunc : (message) => {
        console.log('Message received', message.content.toString(), '... sending response...');
        return 'Hello client !';
    }
});

close

Close manually RabbitMQ connection.

1.1.1

8 years ago

1.1.0

8 years ago

1.0.0

9 years ago

0.1.10

9 years ago

0.1.9

9 years ago

0.1.8

9 years ago

0.1.7

9 years ago

0.1.6

9 years ago

0.1.5

9 years ago

0.1.4

9 years ago

0.1.3

9 years ago

0.1.2

9 years ago

0.1.1

9 years ago

0.1.0

9 years ago