3.0.2 • Published 4 years ago

node-rabbitmq-client v3.0.2

Weekly downloads
70
License
MIT
Repository
-
Last release
4 years ago

Connection management for rabbitmq client

Node js Rabbit MQ client which has connection management backed into it. This project is written on top of amqp-connection-manager.

NOTE

Version 3 is a major and breaking change from Version 2. Please use appropriate version for your use.

Features

  • Automatically reconnect when your amqplib broker dies in a fire.
  • Round-robin connections between multiple brokers in a cluster.
  • If messages are sent while the broker is unavailable, queues messages in memory until we reconnect.
  • Very un-opinionated library - a thin wrapper around amqplib.

Configuration

{
    host: process.env.PUBSUB_RABBITMQ_SERVICE_HOST,
    port: process.env.PUBSUB_RABBITMQ_SERVICE_PORT_AMQP || 5672,
    username: process.env.RABBITMQ_USERNAME,
    password: process.env.RABBITMQ_PASSWORD,
    prefetch: process.env.PREFETCH_JOBS || 2,
    vhost: process.env.VHOST || '/',
    heartbeatInterval: process.env.HEARTBEAT || 5,
    reconnectTime: process.env.RECONNECT_TIME || 10,
    protocol: process.env.RABBITMQ_PROTOCOL || 'amqp',
    defaultQueueFeatures: { durable: true },
    options: {
      // options.findServers(callback) is a function which returns one or more servers to connect to. This should return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism such as Consul or etcd. Instead of taking a callback, this can also return a Promise. Note that if this is supplied, then urls is ignored.
      findServers,
      // options.connectionOptions is passed as options to the amqplib connect method.
      connectionOptions
    }
}

Usage

Using yarn: yarn add node-rabbitmq-client@3.0.0 OR Using npm: npm install node-rabbitmq-client@3.0.0

import RabbitMQClient from 'node-rabitmq-client';
// (OR)
const RabbitMQClient = require('node-rabbitmq-client');

// instantiate a client object
const client = new RabbitMQClient(config);

/* to publish a message */
// `data` is JS object
client.publish({ queue: { name: 'some name' } }, data);

/* to consume from a queue */
client.consume({ queue: { name: 'some name' } }, promiseHandler);

/* to purge a queue */
client.purge({ queue: { name: 'some name' } });

/* to ack all messages */
client.ackAll();

Please read this for implementing consume

  • promiseHandler for consume should always return a resolved Promise even if some operations on the received message fails.
  • When returning a resolved Promise, parameters need not be passed to it.If passed, these are simply ignored.
  • Best practice is to implement a catch handler for the promiseHandler and push to some other queue and return a resolved Promise from there.
  • If parsing the JSON message fails while consuming, a rejected promise is thrown and needs to be handled appropriately (This is optional. Whether or not queue is provided, the error will be logged);
  • promiseHandler gets the message and the options that were passed to consume intially
/**
 *
  options is the object which is passed to consume at the time of initialization
  {
    queue: {
      name: 'some-queue-name',
      messagePriority: message priority (1-10), // set if the queue is a priority queue. It is optional
      options: {
        arguments: {
          'x-max-priority': queue priority (1- 10) // set to make the queue a priority queue. It is optional
        }
      }
    }
  }
 */
promiseFunction(message, options)
  .then(data => {
    /* once processing the message is successful, return resolved promise */
    /* if status queue is provided and success should be recorded */
    if (statusQueue && recordSuccess) {
      client.publish(statusQueue, {
        status: 'success',
        queueName,
        message: data
      });
    }

    /* this is needed to ack to the channel regarding this message */
    return Promise.resolve();
  })
  .catch(error => {
    if (statusQueue && recordError) {
      /* if status queue is provided and failure should be recorded */
      client.publish(statusQueue, {
        status: 'error',
        queueName,
        error,
        message
      }).then(() => channel.ack(msg));
    }
    logger.log('error', {
      note: `Error while processing the message from ${queueName}`,
      error: error
    });

    /* return resolved Promise from here */
    return Promise.resolve();
  });
3.0.2

4 years ago

3.0.1

4 years ago

3.0.0

4 years ago

2.0.2

5 years ago

2.0.1

5 years ago

2.0.0

5 years ago