1.2.1 • Published 4 years ago

amqp-connection-manager-rpc v1.2.1

Weekly downloads
8
License
MIT
Repository
github
Last release
4 years ago

NPM Package Build Status Coverage Status Greenkeeper badge semantic-release

Dependency Status devDependency Status peerDependency Status

Extend amqp-connection-manager connection management for amqplib to support Remote procedure call (RPC).

amqp-connection-manager-rpc

Features

  • Time to live for RPC requests.
  • Exceptions transmitted from RPC server to RPC client.
  • Simple async function API design

Installation

npm install --save amqplib amqp-connection-manager amqp-connection-manager-rpc

Basics

The basic idea described at rabbitmq. To manage responses from an RPC server, node-cache is used.

Here's the RPC client example:

var amqp = require('amqp-connection-manager-rpc');

// Create a new connection manager
var connection = amqp.connect(['amqp://localhost'], {json: true});

// Setup a channel for RPC requests.
const ttl = 60; // Time to live for RPC request (seconds). 0 - infinite
var channelWrapper = connection.createRPCClient('RPC-QUEUE-test', ttl);

// Send some request to RPC server and receive reply. Exception can occupied!
let req = { a: 1, b: 2}; //request data
try{
    let prc_reply = await channelWrapper.sendRPC(req);
    console.log("RPC reply: ", prc_reply);
} catch (err) {
    console.log("RPC error: ", err);
}

Here's the RPC server example:

var amqp = require('amqp-connection-manager-rpc');

// Create a new connection manager
var connection = amqp.connect(['amqp://localhost'], {json: true});

// Set up a channel for RPC requests.
var channelWrapper = connection.createRPCServer('RPC-QUEUE-test', doRpcJob);

//do RPC job
async function doRpcJob(msgJson, msg) {
    if (!msgJson.b) throw new Error('B is not set'); //Exceptions allowed! Will be send to RPC client.
    let reply = {
        a: msgJson.a ? msgJson.a + 1 : null
    }
    return reply;
}

See a complete example in the examples folder.

API

See amqp-connection-manager API.

AmqpConnectionManager#createRPCClient(queue_name[, ttl , setup])

Create a new RPC client ChannelWrapper.

  • queue_name - Name of queue for RPC request.
  • ttl - time to live for RPC request (seconds). To infinite set to 0. If not defined used 0.
  • setup - async function(channel) for setup queue and exchange. Must return RPC queue. Default: async function (channel) => { return await channel.assertQueue('', { exclusive: true }) };

Returns ChannelWrapper

AmqpConnectionManager#createRPCServer(queue_name, callback, options )

Create a new RPC server ChannelWrapper.

  • queue_name - Name of queue for RPC request.
  • callback - A callback function, which returns a Promise. This should return RPC server json reply. Callback function has two argument: json message from RPC client, full message from RPC client.

Options:

  • options.sendErrorStack - if true errors stack will be send to client. Default - false.
  • options.setup - async function(channel) for setup channel, exchange. Must return RPC queue name. Default: async function (channel) => { channel.prefetch(1); await channel.assertQueue(queue_name, { durable: false }); return queue_name; };

Returns ChannelWrapper

ChannelWrapper#sendRPC(msg [,ttl [, exchangeName , routingKey]])

Send RPC request to RPC server. Call it on client only.

  • msg - request Object to RPC server.
  • ttl - time to live for RPC request (seconds). To infinite set to 0. If not defined used value from createRPCClient().
  • exchangeName - name of exchange for RPC request.
  • routingKey - routing key for RPC request.

Returns Object with RPC job reply or Exception

Fork it!

Pull requests, issues, and feedback are welcome.