@resilientmq/core v0.2.2
@resilientmq/core
Core logic for the resilient message queue system built on top of RabbitMQ, providing middleware support, retry logic, dead-letter handling, and persistent event lifecycle management.
Table of Contents
- ๐ฆ Installation
- ๐ Purpose
- ๐งฉ Main Concepts
- ๐ง Config: ResilientConsumerConfig
- ๐ง Config: ResilientPublisherConfig
- ๐งฉ Custom Event Storage Format
- ๐ Example: Consumer
- ๐ Example: Publisher
- ๐งช Tests
- Docs
- LICENSE
๐ฆ Installation
npm install @resilientmq/coreMake sure to also install the core types package:
npm install @resilientmq/types__core๐ Purpose
This package contains the runtime logic for publishing and consuming resilient events. It includes:
- A pluggable consumer with retry + DLQ logic
- Publisher with persist-before-send safety
- Middleware pipeline
- Custom logger
- Full TypeScript support
๐งฉ Main Concepts
| Feature | Description |
|---|---|
publish(event) | Publishes a message safely to a queue or exchange |
consume(handler) | Starts a consumer to process incoming messages |
ResilientConsumer | Handles connection, retry, DLQ, and auto-reconnect |
ResilientEventPublisher | Publishes events with status persistence |
log(level, message) | Unified logging mechanism |
Middleware | Custom logic pipeline on message consumption |
๐ง Config: ResilientConsumerConfig
| Property | Type | Required | Description | Subtype Fields |
|---|---|---|---|---|
connection | string \| Options.Connect | โ | RabbitMQ URI or connection config | โ |
consumeQueue.queue | string | โ | Queue name to consume | โ |
consumeQueue.options | AssertQueueOptions | โ | Queue assertion options | durable, arguments |
consumeQueue.exchange | ExchangeConfig | โ | Bind queue to this exchange | name, type, routingKey, options |
retryQueue.queue | string | โ | Retry queue for failed messages | โ |
retryQueue.options | AssertQueueOptions | โ | Queue options | durable, arguments |
retryQueue.exchange | ExchangeConfig | โ | Exchange for retry routing | name, type, routingKey, options |
retryQueue.ttlMs | number | โ | Delay before retrying | โ |
retryQueue.maxAttempts | number | โ | Max retries before DLQ (default 5) | โ |
deadLetterQueue.queue | string | โ | Final destination after retries | โ |
deadLetterQueue.options | AssertQueueOptions | โ | DLQ queue options | durable |
deadLetterQueue.exchange | ExchangeConfig | โ | DLQ exchange | name, type, routingKey, options |
eventsToProcess | EventProcessConfig[] | โ | List of handled event types | type, handler |
store | EventStore | โ | Persistent layer for events | saveEvent, getEvent, updateEventStatus, deleteEvent |
middleware | Middleware[] | โ | Hooks to wrap event execution | (event, next) => Promise |
maxUptimeMs | number | โ | Restart consumer after X ms | โ |
exitIfIdle | boolean | โ | Exit process if idle | โ |
idleCheckIntervalMs | number | โ | Time between idle checks | โ |
maxIdleChecks | number | โ | How many checks until exit | โ |
๐ง Config: ResilientPublisherConfig
| Property | Type | Required | Description |
|---|---|---|---|
connection | string \| Options.Connect | โ | RabbitMQ URI or config |
queue | string | โ | Target queue (direct publish) |
exchange | ExchangeConfig | โ | Exchange for fanout/direct |
store | EventStore | โ | Event metadata persistence |
๐งฉ Custom Event Storage Format
You can fully control how events are stored and retrieved by providing a serializer in your EventStore implementation.
This allows you to decouple the in-memory event format from the database structure โ useful for legacy systems or when mapping to existing schemas.
๐ Example: Custom Storage Serializer
const store: EventStore = {
serializer: {
toStorageFormat(event) {
return {
_id: event.id,
body: event.payload,
customStatus: event.status
};
},
fromStorageFormat(doc) {
return {
id: doc._id,
messageId: doc._id,
payload: doc.body,
status: doc.customStatus,
type: 'custom.type'
};
}
},
async saveEvent(event) {
const doc = this.serializer.toStorageFormat(event);
await db.insert(doc);
},
async getEvent(id) {
const doc = await db.findById(id);
return doc ? this.serializer.fromStorageFormat(doc) : null;
},
async updateEventStatus(id, status) {
await db.update(id, { customStatus: status });
},
async deleteEvent(id) {
await db.delete(id);
}
};๐ Example: Consumer
import { ResilientConsumer } from '@resilientmq/core';
import mongoose from 'mongoose';
const Event = mongoose.model('Event', new mongoose.Schema({ id: String }));
const store = {
saveEvent: async (e) => Event.create(e),
getEvent: async (id) => Event.findOne({ messageId: id }),
updateEventStatus: async (id, status) => Event.updateOne({ messageId: id }, { status }),
deleteEvent: async (id) => Event.deleteOne({ messageId: id })
};
const consumer = new ResilientConsumer({
connection: 'amqp://localhost',
consumeQueue: {
queue: 'user.queue',
options: { durable: true },
exchange: { name: 'user.events', type: 'fanout', options: { durable: true } }
},
eventsToProcess: [
{ type: 'user.created', handler: async (payload) => console.log(payload) }
],
store
});
await consumer.start();๐ Example: Publisher
import { ResilientEventPublisher } from '@resilientmq/core';
const publisher = new ResilientEventPublisher({
connection: 'amqp://localhost',
store: myStore,
exchange: {
name: 'user.events',
type: 'fanout',
options: { durable: true }
}
});
await publisher.publish({
id: 'evt-1',
messageId: 'msg-1',
type: 'user.created',
payload: { name: 'Alice' },
status: 'PENDING_PUBLICATION'
});๐งช Tests
- โ Unit tests with Jest
- โ Integration-ready structure
- โ 100% coverage possible with mocks
๐ฅ Contributors
๐ License
9 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago