0.2.2 โ€ข Published 9 months ago

@resilientmq/core v0.2.2

Weekly downloads
-
License
MIT
Repository
github
Last release
9 months ago

@resilientmq/core

Core logic for the resilient message queue system built on top of RabbitMQ, providing middleware support, retry logic, dead-letter handling, and persistent event lifecycle management.

Table of Contents

๐Ÿ“ฆ Installation

npm install @resilientmq/core

Make sure to also install the core types package:

npm install @resilientmq/types__core

๐Ÿ“š Purpose

This package contains the runtime logic for publishing and consuming resilient events. It includes:

  • A pluggable consumer with retry + DLQ logic
  • Publisher with persist-before-send safety
  • Middleware pipeline
  • Custom logger
  • Full TypeScript support

๐Ÿงฉ Main Concepts

FeatureDescription
publish(event)Publishes a message safely to a queue or exchange
consume(handler)Starts a consumer to process incoming messages
ResilientConsumerHandles connection, retry, DLQ, and auto-reconnect
ResilientEventPublisherPublishes events with status persistence
log(level, message)Unified logging mechanism
MiddlewareCustom logic pipeline on message consumption

๐Ÿ”ง Config: ResilientConsumerConfig

PropertyTypeRequiredDescriptionSubtype Fields
connectionstring \| Options.Connectโœ…RabbitMQ URI or connection configโ€“
consumeQueue.queuestringโœ…Queue name to consumeโ€“
consumeQueue.optionsAssertQueueOptionsโœ…Queue assertion optionsdurable, arguments
consumeQueue.exchangeExchangeConfigโŒBind queue to this exchangename, type, routingKey, options
retryQueue.queuestringโŒRetry queue for failed messagesโ€“
retryQueue.optionsAssertQueueOptionsโŒQueue optionsdurable, arguments
retryQueue.exchangeExchangeConfigโŒExchange for retry routingname, type, routingKey, options
retryQueue.ttlMsnumberโŒDelay before retryingโ€“
retryQueue.maxAttemptsnumberโŒMax retries before DLQ (default 5)โ€“
deadLetterQueue.queuestringโŒFinal destination after retriesโ€“
deadLetterQueue.optionsAssertQueueOptionsโŒDLQ queue optionsdurable
deadLetterQueue.exchangeExchangeConfigโŒDLQ exchangename, type, routingKey, options
eventsToProcessEventProcessConfig[]โœ…List of handled event typestype, handler
storeEventStoreโœ…Persistent layer for eventssaveEvent, getEvent, updateEventStatus, deleteEvent
middlewareMiddleware[]โŒHooks to wrap event execution(event, next) => Promise
maxUptimeMsnumberโŒRestart consumer after X msโ€“
exitIfIdlebooleanโŒExit process if idleโ€“
idleCheckIntervalMsnumberโŒTime between idle checksโ€“
maxIdleChecksnumberโŒHow many checks until exitโ€“

๐Ÿ”ง Config: ResilientPublisherConfig

PropertyTypeRequiredDescription
connectionstring \| Options.Connectโœ…RabbitMQ URI or config
queuestringโŒTarget queue (direct publish)
exchangeExchangeConfigโŒExchange for fanout/direct
storeEventStoreโœ…Event metadata persistence

๐Ÿงฉ Custom Event Storage Format

You can fully control how events are stored and retrieved by providing a serializer in your EventStore implementation.

This allows you to decouple the in-memory event format from the database structure โ€” useful for legacy systems or when mapping to existing schemas.

๐Ÿ”„ Example: Custom Storage Serializer

const store: EventStore = {
  serializer: {
    toStorageFormat(event) {
      return {
        _id: event.id,
        body: event.payload,
        customStatus: event.status
      };
    },
    fromStorageFormat(doc) {
      return {
        id: doc._id,
        messageId: doc._id,
        payload: doc.body,
        status: doc.customStatus,
        type: 'custom.type'
      };
    }
  },

  async saveEvent(event) {
    const doc = this.serializer.toStorageFormat(event);
    await db.insert(doc);
  },

  async getEvent(id) {
    const doc = await db.findById(id);
    return doc ? this.serializer.fromStorageFormat(doc) : null;
  },

  async updateEventStatus(id, status) {
    await db.update(id, { customStatus: status });
  },

  async deleteEvent(id) {
    await db.delete(id);
  }
};

๐Ÿš€ Example: Consumer

import { ResilientConsumer } from '@resilientmq/core';
import mongoose from 'mongoose';

const Event = mongoose.model('Event', new mongoose.Schema({ id: String }));
const store = {
  saveEvent: async (e) => Event.create(e),
  getEvent: async (id) => Event.findOne({ messageId: id }),
  updateEventStatus: async (id, status) => Event.updateOne({ messageId: id }, { status }),
  deleteEvent: async (id) => Event.deleteOne({ messageId: id })
};

const consumer = new ResilientConsumer({
  connection: 'amqp://localhost',
  consumeQueue: {
    queue: 'user.queue',
    options: { durable: true },
    exchange: { name: 'user.events', type: 'fanout', options: { durable: true } }
  },
  eventsToProcess: [
    { type: 'user.created', handler: async (payload) => console.log(payload) }
  ],
  store
});

await consumer.start();

๐Ÿš€ Example: Publisher

import { ResilientEventPublisher } from '@resilientmq/core';

const publisher = new ResilientEventPublisher({
  connection: 'amqp://localhost',
  store: myStore,
  exchange: {
    name: 'user.events',
    type: 'fanout',
    options: { durable: true }
  }
});

await publisher.publish({
  id: 'evt-1',
  messageId: 'msg-1',
  type: 'user.created',
  payload: { name: 'Alice' },
  status: 'PENDING_PUBLICATION'
});

๐Ÿงช Tests

  • โœ… Unit tests with Jest
  • โœ… Integration-ready structure
  • โœ… 100% coverage possible with mocks

๐Ÿ‘ฅ Contributors


๐Ÿ“„ License

MIT

0.2.2

9 months ago

0.2.1

10 months ago

0.2.0

10 months ago

0.1.22

10 months ago

0.1.21

10 months ago

0.1.20

10 months ago

0.1.19

10 months ago

0.1.18

10 months ago

0.1.17

10 months ago

0.1.16

10 months ago

0.1.15

10 months ago

0.1.14

10 months ago

0.1.13

10 months ago

0.1.12

10 months ago

0.1.11

10 months ago

0.1.10

10 months ago

0.1.9

10 months ago

0.1.8

10 months ago

0.1.7

10 months ago

0.1.6

10 months ago

0.1.5

10 months ago

0.1.4

10 months ago

0.1.3

10 months ago

0.1.2

10 months ago