1.0.0 • Published 2 years ago

@artie-owlet/cabbit v1.0.0

Weekly downloads
-
License
MIT
Repository
github
Last release
2 years ago

cabbit

CI Coverage Lint

Package for easy consuming messages from RabbitMQ.


Install

npm install @artie-owlet/cabbit

Usage

Get started

You can create cabbit with its own connection:

import { Cabbit } from '@artie-owlet/cabbit';
// using connection URL
const cabbit1 = new Cabbit('amqp://user:pwd@example.com:5672/');
// using options
const cabbit2 = new Cabbit({
    hostname: 'example.com',
    port: 5672,
    username: 'user',
    password: 'pwd',
    vhost: '/',
})

Or you can create your own connection (using the ConnectionWrapper) and pass it to Cabbit constructor (e.g. you also want to publish messages):

import { ConnectionWrapper } from '@artie-owlet/amqplib-wrapper';
import { Cabbit } from '@artie-owlet/cabbit';

const connWrap = new ConnectionWrapper('amqp://user:pwd@example.com:5672/?reconnectTimeout=1000');
// cabbit for consuming messages
const cabbit = new Cabbit(connWrap);
// another channel for publishing messages
const chan = await chanWrap.getChannel();

Consuming from named queue

graph LR
    pub{{Pub}} --> queue[test_queue] --> sub{{Sub}}
cabbit.queue('test_queue', (msg) => {
    if (msg.body === undefined) {
        console.error(msg.parseError);
    } else {
        console.log(msg.body);
    }
    msg.ack();
});

NOTE: Cabbit trying to decode and parse messages according to their encoding and MIME-type. If an error occurs the body will be undefined. Usually you should check it.

Simple subcribing to exchange

graph LR
    pub{{Pub}} --> ex((D test_ex)) -- key --> queue[test_queue] --> sub{{Sub}}

In such simple cases, Cabbit represents the subscription as a consumption from the exchange through the (named) queue:

cabbit.direct('test_ex').consume('test_queue', (msg) => {
    // handle message
    msg.ack();
}, 'key');

Temporary queue

Temporary queue - is a server-named queue which should be removed after the client disconnects. By default Cabbit consumes from a temporary queue with noAck=true.

graph LR
    pub{{Pub}} --> ex((D test_ex)) -- key --> queue[amq.gen-...] --> sub{{Sub}}
cabbit.direct('test_ex').consume((msg) => {
    // handle message
    // DON'T call msg.ack()
}, 'key');

Complex example

graph LR
    pub{{Pub}}
    sub{{Sub}}
    ex1((D log_ex))
    ex2((H error_ex))
    queue1[all_logs_q]
    queue2[test_err_q]
    pub --> ex1
    ex1 -- info --> queue1
    ex1 -- warn --> queue1
    ex1 -- error --> queue1
    queue1 --> sub
    ex1 -- error --> ex2 -->queue2 -->sub
const logEx = cabbit.direct('log_ex');

logEx.consume('all_logs_q', (msg) => {
    switch (msg.fields.routingKey) {
        // handle message according to its routing key
    }
    msg.ack();
}, ['info', 'warn', 'error']); // you can pass a list of routing keys

logEx.headers('error_ex').consume('test_err_q', (msg) => {
    // handle message
    msg.ack();
}, {
    'x-match': 'all',
    app: 'test',
});

Subscribing to multiple exchanges

graph LR
    pub1{{Pub 1}} --> ex1((D ex1)) -- key --> queue[test_queue] --> sub{{Sub}}
    pub2{{Pub 2}} --> ex2((F ex2)) --> queue
cabbit.queue('test_queue', (msg) => {
    // handle message
    msg.ack();
}).subscribe(cabbit.direct('ex1'), 'key').subscribe(cabbit.fanout('ex2'));

API

See https://artie-owlet.github.io/cabbit/