0.2.2 โ€ข Published 6 months ago

@resilientmq/core v0.2.2

Weekly downloads
-
License
MIT
Repository
github
Last release
6 months ago

@resilientmq/core

Core logic for the resilient message queue system built on top of RabbitMQ, providing middleware support, retry logic, dead-letter handling, and persistent event lifecycle management.

Table of Contents

๐Ÿ“ฆ Installation

npm install @resilientmq/core

Make sure to also install the core types package:

npm install @resilientmq/types__core

๐Ÿ“š Purpose

This package contains the runtime logic for publishing and consuming resilient events. It includes:

  • A pluggable consumer with retry + DLQ logic
  • Publisher with persist-before-send safety
  • Middleware pipeline
  • Custom logger
  • Full TypeScript support

๐Ÿงฉ Main Concepts

FeatureDescription
publish(event)Publishes a message safely to a queue or exchange
consume(handler)Starts a consumer to process incoming messages
ResilientConsumerHandles connection, retry, DLQ, and auto-reconnect
ResilientEventPublisherPublishes events with status persistence
log(level, message)Unified logging mechanism
MiddlewareCustom logic pipeline on message consumption

๐Ÿ”ง Config: ResilientConsumerConfig

PropertyTypeRequiredDescriptionSubtype Fields
connectionstring \| Options.Connectโœ…RabbitMQ URI or connection configโ€“
consumeQueue.queuestringโœ…Queue name to consumeโ€“
consumeQueue.optionsAssertQueueOptionsโœ…Queue assertion optionsdurable, arguments
consumeQueue.exchangeExchangeConfigโŒBind queue to this exchangename, type, routingKey, options
retryQueue.queuestringโŒRetry queue for failed messagesโ€“
retryQueue.optionsAssertQueueOptionsโŒQueue optionsdurable, arguments
retryQueue.exchangeExchangeConfigโŒExchange for retry routingname, type, routingKey, options
retryQueue.ttlMsnumberโŒDelay before retryingโ€“
retryQueue.maxAttemptsnumberโŒMax retries before DLQ (default 5)โ€“
deadLetterQueue.queuestringโŒFinal destination after retriesโ€“
deadLetterQueue.optionsAssertQueueOptionsโŒDLQ queue optionsdurable
deadLetterQueue.exchangeExchangeConfigโŒDLQ exchangename, type, routingKey, options
eventsToProcessEventProcessConfig[]โœ…List of handled event typestype, handler
storeEventStoreโœ…Persistent layer for eventssaveEvent, getEvent, updateEventStatus, deleteEvent
middlewareMiddleware[]โŒHooks to wrap event execution(event, next) => Promise
maxUptimeMsnumberโŒRestart consumer after X msโ€“
exitIfIdlebooleanโŒExit process if idleโ€“
idleCheckIntervalMsnumberโŒTime between idle checksโ€“
maxIdleChecksnumberโŒHow many checks until exitโ€“

๐Ÿ”ง Config: ResilientPublisherConfig

PropertyTypeRequiredDescription
connectionstring \| Options.Connectโœ…RabbitMQ URI or config
queuestringโŒTarget queue (direct publish)
exchangeExchangeConfigโŒExchange for fanout/direct
storeEventStoreโœ…Event metadata persistence

๐Ÿงฉ Custom Event Storage Format

You can fully control how events are stored and retrieved by providing a serializer in your EventStore implementation.

This allows you to decouple the in-memory event format from the database structure โ€” useful for legacy systems or when mapping to existing schemas.

๐Ÿ”„ Example: Custom Storage Serializer

const store: EventStore = {
  serializer: {
    toStorageFormat(event) {
      return {
        _id: event.id,
        body: event.payload,
        customStatus: event.status
      };
    },
    fromStorageFormat(doc) {
      return {
        id: doc._id,
        messageId: doc._id,
        payload: doc.body,
        status: doc.customStatus,
        type: 'custom.type'
      };
    }
  },

  async saveEvent(event) {
    const doc = this.serializer.toStorageFormat(event);
    await db.insert(doc);
  },

  async getEvent(id) {
    const doc = await db.findById(id);
    return doc ? this.serializer.fromStorageFormat(doc) : null;
  },

  async updateEventStatus(id, status) {
    await db.update(id, { customStatus: status });
  },

  async deleteEvent(id) {
    await db.delete(id);
  }
};

๐Ÿš€ Example: Consumer

import { ResilientConsumer } from '@resilientmq/core';
import mongoose from 'mongoose';

const Event = mongoose.model('Event', new mongoose.Schema({ id: String }));
const store = {
  saveEvent: async (e) => Event.create(e),
  getEvent: async (id) => Event.findOne({ messageId: id }),
  updateEventStatus: async (id, status) => Event.updateOne({ messageId: id }, { status }),
  deleteEvent: async (id) => Event.deleteOne({ messageId: id })
};

const consumer = new ResilientConsumer({
  connection: 'amqp://localhost',
  consumeQueue: {
    queue: 'user.queue',
    options: { durable: true },
    exchange: { name: 'user.events', type: 'fanout', options: { durable: true } }
  },
  eventsToProcess: [
    { type: 'user.created', handler: async (payload) => console.log(payload) }
  ],
  store
});

await consumer.start();

๐Ÿš€ Example: Publisher

import { ResilientEventPublisher } from '@resilientmq/core';

const publisher = new ResilientEventPublisher({
  connection: 'amqp://localhost',
  store: myStore,
  exchange: {
    name: 'user.events',
    type: 'fanout',
    options: { durable: true }
  }
});

await publisher.publish({
  id: 'evt-1',
  messageId: 'msg-1',
  type: 'user.created',
  payload: { name: 'Alice' },
  status: 'PENDING_PUBLICATION'
});

๐Ÿงช Tests

  • โœ… Unit tests with Jest
  • โœ… Integration-ready structure
  • โœ… 100% coverage possible with mocks

๐Ÿ‘ฅ Contributors


๐Ÿ“„ License

MIT

0.2.2

6 months ago

0.2.1

6 months ago

0.2.0

6 months ago

0.1.22

6 months ago

0.1.21

6 months ago

0.1.20

6 months ago

0.1.19

6 months ago

0.1.18

6 months ago

0.1.17

6 months ago

0.1.16

6 months ago

0.1.15

7 months ago

0.1.14

7 months ago

0.1.13

7 months ago

0.1.12

7 months ago

0.1.11

7 months ago

0.1.10

7 months ago

0.1.9

7 months ago

0.1.8

7 months ago

0.1.7

7 months ago

0.1.6

7 months ago

0.1.5

7 months ago

0.1.4

7 months ago

0.1.3

7 months ago

0.1.2

7 months ago