@resilientmq/core v0.2.2
@resilientmq/core
Core logic for the resilient message queue system built on top of RabbitMQ, providing middleware support, retry logic, dead-letter handling, and persistent event lifecycle management.
Table of Contents
- ๐ฆ Installation
- ๐ Purpose
- ๐งฉ Main Concepts
- ๐ง Config: ResilientConsumerConfig
- ๐ง Config: ResilientPublisherConfig
- ๐งฉ Custom Event Storage Format
- ๐ Example: Consumer
- ๐ Example: Publisher
- ๐งช Tests
- Docs
- LICENSE
๐ฆ Installation
npm install @resilientmq/coreMake sure to also install the core types package:
npm install @resilientmq/types__core๐ Purpose
This package contains the runtime logic for publishing and consuming resilient events. It includes:
- A pluggable consumer with retry + DLQ logic
- Publisher with persist-before-send safety
- Middleware pipeline
- Custom logger
- Full TypeScript support
๐งฉ Main Concepts
| Feature | Description |
|---|---|
publish(event) | Publishes a message safely to a queue or exchange |
consume(handler) | Starts a consumer to process incoming messages |
ResilientConsumer | Handles connection, retry, DLQ, and auto-reconnect |
ResilientEventPublisher | Publishes events with status persistence |
log(level, message) | Unified logging mechanism |
Middleware | Custom logic pipeline on message consumption |
๐ง Config: ResilientConsumerConfig
| Property | Type | Required | Description | Subtype Fields |
|---|---|---|---|---|
connection | string \| Options.Connect | โ | RabbitMQ URI or connection config | โ |
consumeQueue.queue | string | โ | Queue name to consume | โ |
consumeQueue.options | AssertQueueOptions | โ | Queue assertion options | durable, arguments |
consumeQueue.exchange | ExchangeConfig | โ | Bind queue to this exchange | name, type, routingKey, options |
retryQueue.queue | string | โ | Retry queue for failed messages | โ |
retryQueue.options | AssertQueueOptions | โ | Queue options | durable, arguments |
retryQueue.exchange | ExchangeConfig | โ | Exchange for retry routing | name, type, routingKey, options |
retryQueue.ttlMs | number | โ | Delay before retrying | โ |
retryQueue.maxAttempts | number | โ | Max retries before DLQ (default 5) | โ |
deadLetterQueue.queue | string | โ | Final destination after retries | โ |
deadLetterQueue.options | AssertQueueOptions | โ | DLQ queue options | durable |
deadLetterQueue.exchange | ExchangeConfig | โ | DLQ exchange | name, type, routingKey, options |
eventsToProcess | EventProcessConfig[] | โ | List of handled event types | type, handler |
store | EventStore | โ | Persistent layer for events | saveEvent, getEvent, updateEventStatus, deleteEvent |
middleware | Middleware[] | โ | Hooks to wrap event execution | (event, next) => Promise |
maxUptimeMs | number | โ | Restart consumer after X ms | โ |
exitIfIdle | boolean | โ | Exit process if idle | โ |
idleCheckIntervalMs | number | โ | Time between idle checks | โ |
maxIdleChecks | number | โ | How many checks until exit | โ |
๐ง Config: ResilientPublisherConfig
| Property | Type | Required | Description |
|---|---|---|---|
connection | string \| Options.Connect | โ | RabbitMQ URI or config |
queue | string | โ | Target queue (direct publish) |
exchange | ExchangeConfig | โ | Exchange for fanout/direct |
store | EventStore | โ | Event metadata persistence |
๐งฉ Custom Event Storage Format
You can fully control how events are stored and retrieved by providing a serializer in your EventStore implementation.
This allows you to decouple the in-memory event format from the database structure โ useful for legacy systems or when mapping to existing schemas.
๐ Example: Custom Storage Serializer
const store: EventStore = {
serializer: {
toStorageFormat(event) {
return {
_id: event.id,
body: event.payload,
customStatus: event.status
};
},
fromStorageFormat(doc) {
return {
id: doc._id,
messageId: doc._id,
payload: doc.body,
status: doc.customStatus,
type: 'custom.type'
};
}
},
async saveEvent(event) {
const doc = this.serializer.toStorageFormat(event);
await db.insert(doc);
},
async getEvent(id) {
const doc = await db.findById(id);
return doc ? this.serializer.fromStorageFormat(doc) : null;
},
async updateEventStatus(id, status) {
await db.update(id, { customStatus: status });
},
async deleteEvent(id) {
await db.delete(id);
}
};๐ Example: Consumer
import { ResilientConsumer } from '@resilientmq/core';
import mongoose from 'mongoose';
const Event = mongoose.model('Event', new mongoose.Schema({ id: String }));
const store = {
saveEvent: async (e) => Event.create(e),
getEvent: async (id) => Event.findOne({ messageId: id }),
updateEventStatus: async (id, status) => Event.updateOne({ messageId: id }, { status }),
deleteEvent: async (id) => Event.deleteOne({ messageId: id })
};
const consumer = new ResilientConsumer({
connection: 'amqp://localhost',
consumeQueue: {
queue: 'user.queue',
options: { durable: true },
exchange: { name: 'user.events', type: 'fanout', options: { durable: true } }
},
eventsToProcess: [
{ type: 'user.created', handler: async (payload) => console.log(payload) }
],
store
});
await consumer.start();๐ Example: Publisher
import { ResilientEventPublisher } from '@resilientmq/core';
const publisher = new ResilientEventPublisher({
connection: 'amqp://localhost',
store: myStore,
exchange: {
name: 'user.events',
type: 'fanout',
options: { durable: true }
}
});
await publisher.publish({
id: 'evt-1',
messageId: 'msg-1',
type: 'user.created',
payload: { name: 'Alice' },
status: 'PENDING_PUBLICATION'
});๐งช Tests
- โ Unit tests with Jest
- โ Integration-ready structure
- โ 100% coverage possible with mocks
๐ฅ Contributors
๐ License
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago