haredo v2.11.1
Haredo
Haredo version 3 introduces breaking changes. See 3.0 Changes
RabbitMQ client for Node.js with a focus on simplicity and type safety.
Table of Contents
- Features
- Usage
- Initializing
- Listening for messages
- Publishing to an exchange
- Publishing to a queue
- Limit concurrency
- Delayed messages
- Quorum queues with delivery limits
- Message throttling
- Dead letter
- Middleware
- Graceful shutdown
- Automatic setup
- Extending Haredo
- Global middleware
Features
- TypeScript
- Chaining based API
- Graceful closing
- Automatic setup of queues and exchanges
- Automatic acking / nacking based on the promise returned from the subscriber
Usage
Working examples are available on github
Initializing
import { Haredo } from 'haredo';
const haredo = Haredo({
url: 'amqp://localhost:5672/'
});
Listening for messages
haredo.queue('my-queue')
.bindExchange('testExchange', '#', 'topic', { durable: false }) // Can be omitted if you don't want to bind the queue to an exchange right now
.subscribe(async (message) => {
console.log(message.data);
});
Publishing to an exchange
haredo.exchange('my-exchange').publish({ id: 5, status: 'active' }, 'item.created');
Publishing to a queue
haredo.queue('my-queue').publish({ id: 5, status: 'inactive' });
Limit concurrency
haredo.queue('my-queue')
.prefetch(5) // same as .concurrency(5)
.subscribe(async (message) => {
console.log(message);
});
Delayed messages
Note: this requires RabbitMQ Delayed Message Plugin to be installed and enabled on the server.
interface Message {.exchange
id: number;
}
const delayedExchange = Exchange<Message>('my-delayed-exchange', 'x-delayed-message').delayed('topic');
await haredo.queue('my-queue')
.bindExchange(delayedExchange, '#')
.subscribe((data, { timestamp }) => {
console.log(`Received message in ${ Date.now() - timestamp }ms id:${ data.id } `);
});
let id = 0;
while (true) {
id += 1;
console.log('Publishing message', id);
const msg = delayedMessage.json({ id }).timestamp(Date.now());
await haredo
.exchange(delayedExchange)
.delay(1000)
.publish(msg);
await delay(2000);
}
Quorum queues with delivery limits
Node: requires RabbitMQ 3.8.0 or higher, see Quorum Queues Overview for more information.
Message throttling
await haredo.queue('my-queue')
.backoff(standardBackoff({
failThreshold: 3,
failSpan: 5000,
failTimeout: 5000
}))
.subscribe(() => {
throw new Error('Nack this message for me')
});
Dead letter
Middleware
import { Middleware } from 'haredo';
const timeMessage: Middleware = ({ queue }, next) => {
const start = Date.now();
await next();
console.log(`Message took ${ Date.now() - start }ms`);
}
await haredo.queue('my-queue')
.use(timeMessage)
.subscribe(() => {
throw new Error('Nack this message for me')
});
Graceful shutdown
Calling consumer.close() will send cancel to channel and wait for existing messages to be handled before resolving the returned promise.
Calling haredoInstance.close() will gracefully close all of it's consumers
Automatic setup
By default Haredo will automatically assert the queus and exchanges and bind them to each other. This can be disabled by calling .skipSetup()
await haredo.queue('my-queue')
.skipSetup()
.subscribe(() => {
throw new Error('Nack this message for me');
});
// Only create the queue, don't bind it to any exchanges and don't create any exchanges
await haredo.queue('my-queue')
.bindExchange('testExchange', '#', 'topic', { durable: false })
.skipSetup({ skipBoundExchanges: true, skipBindings: true, skipCreate: false });
Extending Haredo
Add new methods to the Haredo instance
declare module 'haredo/types' {
interface QueueChain<T> {
cid(cid: string): this;
}
}
const haredo = Haredo({
url: 'amqp://localhost:5672/'
extensions: [
{
name: 'cid',
queue: (state) => {
return (cid: string) => ({
...state,
headers: {
...state.headers,
'x-cid': cid
}
});
}
}
]
});
await haredo.queue('my-queue')
.cid('123')
.publish({ id: 5, status: 'inactive' });
Global middleware
Add a middleware that will be called for every message in every subscriber
declare module 'haredo/types' {
interface HaredoMessage<T> {
cid?: string;
}
}
const haredo = Haredo({
url: 'amqp://localhost:5672/'
globalMiddleware: [
(message) => {
message.cid = message.headers?.['x-cid'] as string;
}
]
});
6 months ago
6 months ago
6 months ago
5 months ago
12 months ago
1 year ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago