1.0.3 • Published 3 years ago

@notifications-system/core v1.0.3

Weekly downloads
-
License
ISC
Repository
github
Last release
3 years ago

Notifications System Core Package

Description

Core package for Notifications System

Install

npm i @notifications-system/core

Exports

  • Services;
  • Interfaces;
  • Types;
  • InMemory Storage;
  • Console Transport;
  • Helpers;

Description

The Core module provide basic notifications/queue logic and interfaces such as:

  • Notification service with queue;
  • Notification history;
  • Configurable error processing: Default ResendErrorHandler with "Resend Strategy";
  • Configurable "Leaky Bucket" provides transport limitation of send count per (time or by try);

Services

  • NotificationService: Base service for sending/processing notifications;
  • NotificationQueueManager: Queue processing. If it isn't used, you must to manually call NotificationService::processQueue(transports?: ITransport | ITransport[]) periodically;

Default internal configuration of NotificationService

No Error processing, No LeakyBucket

<IConfig> {
  eventEmitter: new EventEmitter(),
  errorHandler: new DummyErrorHandler(),
  leakyBucket: new DummyBucketService(),
  processingInterval: 3,
}

Demo Sample with In-Memory storage and ConsoleTransport

ConsoleTransport - demo transport that nothing really send, but only logs all message to console

import {
  ConsoleTransport,
  GeometryProgressionStrategy,
  IOriginalData,
  IQueueProcessingEvent,
  MemoryStorage,
  NotificationQueueManager,
  NotificationService,
  QUEUE_BEFORE_PROCESSING,
  ResendErrorHandler,
  TRANSPORT_CONSOLE,
} from '@notifications-system/core';

let service: NotificationService;

async function main() {
  // Instantiate Notification Service
  service = new NotificationService(
    // In-Memory StorageService (IStorageService implementation)
    await new MemoryStorage().initialize(),
    // All necessary ITransport instances for current project
    [
      new ConsoleTransport(),
      // ...,
      // new TransportXXXX(),
    ],
    // optionally override internal default configuration
    // In addition this configuration can be overridden by ITransport::config (individualy for each transport)
    {
      // ResendErrorHandler with GeometryProgressionStrategy for "error processing"
      // Try resend 20 times from 10 sec interval to 3600 sec used geometry progression (denom 2) to calc next "wait" interval
      // 10, 20, 40, 80, 160, ... 3600, 3600, ... | max to 20 times or success response
      errorHandler: new ResendErrorHandler(new GeometryProgressionStrategy(20, 10, 3600)),

      // ILeakyBucketService
      // For example send max 10 messages for each transports by one try
      leakyBucket: new LeakyBucketService(10),
      // Or max 8 messages for each transports in 15 sec
      // leakyBucket: new LeakyBucketService(8, 15),

      // Override processing interval (default: 3 sec)
      processingInterval: 5, // sec
    },
  );

  // Sample Notification subscriber
  service.eventEmitter.on(QUEUE_BEFORE_PROCESSING, (event: IQueueProcessingEvent) => {
    console.info(`process ${event.items.length} at ${(new Date()).toLocaleTimeString()}:`);
  });

  // Start Queue Processing
  const queueManager = new NotificationQueueManager(service).queueStart();
  // To stop Queue Processing:
  // queueManager.queueStop();


  // Sample usage

  // Find recipient (from database or configuration or define manually)
  // const recipient: IUserEntity = {
  //   id: randomUUID(),
  //   name: 'User-01',
  //   email: 'user-01@mail.test',
  //   phone: '+380001234567',
  // };
  //
  // Or for single transport we can use transport specific string
  // As sample for email transport:
  const recipient: string = 'user-01@mail.test User-01'

  // Prepare data for send (IOriginalData format)
  const data: IOriginalData = {
    recipient,
    // Payload will be processed by appropriate IDataProvider implementation for specific transport
    // It can be IOriginalPayload or its inheritance:
    payload: {
      title: 'Notification',
      body: 'Hello from Notification System!!',
    },
    // Or a simple message body string:
    // payload: 'Hello from Notification System!!',
  };

  // Find transport aliases for certain purpose (from database or configuration or define manually)
  // string or array: [TRANSPORT_CONSOLE, TRANSPORT_SMTP, TRANSPORT_XXXX];
  const transport = TRANSPORT_CONSOLE;

  // Send data to selected transport
  await service.send(transport, data);
}
main();

NestJS Integration (with Mail transport and TypeORM-0.2.45 storage)

Used Packages:

Install Notification System with necessary packages

  • npm i typeorm@0.2.45
  • npm i --save-dev typeorm-extension@1.2.2
  • npm i @notifications-system/core
  • npm i @notifications-system/transport-mailer
  • npm i @notifications-system/storage-typeorm-0.2

Prepare

  • Copy content of mailer .env.dist to your .env and change parameters for you SMTP Service
  • Copy content of typeorm-0.2 .env.dist to your .env and change parameters for you database connection

Migrations

Copy ormconfig (documentation) to project root directory:

cp ./node_modules/@notifications-system/storage-typeorm-0.2/ormconfig.js ./

Copy migrations from library to project migrations directory:

cp ./node_modules/@notifications-system/storage-typeorm-0.2/migrations/*.js ./migrations/

Run migrations:

./node_modules/.bin/typeorm migration:run

Create notification.service.ts file in src directory

import { Injectable } from '@nestjs/common'; import { INotificationEntity, IOptions, IQueueEntity, ITransport, NotificationService as BaseService, UNREAD_STATUSES, } from '@notifications-system/core'; import { FindWhere, TypeormStorage } from '@notifications-system/storage-typeorm-0.2'; import { In } from 'typeorm';

/**

  • NotificationService with Typeorm Storage for save Queue / History
  • Main purpose for override BaseService is @Injectable() annotation
  • Also in addition you can extend functionality of notification service specifically for project */ @Injectable() export class NotificationService extends BaseService { /**

    • Find Notification (history) by "recipient" */ async findByRecipient(userId: string, transports?: ITransport[], withRead = false, options?: IOptions) : Promise<INotificationEntity<string, string>[]> {

      const where: FindWhere<INotificationEntity<string, string>> = {};
      if (transports && transports.length > 0) {
        where.transport = In(transports);
      }
      if (!withRead) {
        where.status = In(UNREAD_STATUSES);
      }
      
      return this.storage.notificationRepo?.findByRecipient(userId, where, options) ?? [];

      } }

### Modify `app.module.ts`

[src/app.module.ts]:

```typescript
require('dotenv').config();
import { Module } from '@nestjs/common';
import { NotificationQueueManager } from '@notifications-system/core';
import { StorageOptions, TypeormStorage } from '@notifications-system/storage-typeorm-0.2';
import { ISmtpTransportConfig, MailDataProvider, SmtpTransport } from '@notifications-system/transport-mailer';
import { configDatabase, configTransportSmtp } from './config';
import { NotificationService } from './notification.service';

@Module({
  providers: [
    {
      // Sample NotificationService with Typeorm Storage for save Queue / History
      provide: NotificationService,
      useFactory: async () => {
        return new NotificationService(
          await new TypeormStorage().initialize(require('../../ormconfig.js')),
          // All necessary ITransport instances for current project
          [
            new SmtpTransport({
              options: {
                host: process.env.MAIL_HOST,
                port: Number(process.env.MAIL_PORT) || undefined,
                auth: {
                  user: process.env.MAIL_USER,
                  pass: process.env.MAIL_PASS,
                },
              },
              defaults: {
                from: process.env.MAIL_FROM,
              },
            }, new MailDataProvider()),
          ],
          // optionally override internal default configuration
          // In addition this configuration can be overridden by ITransport::config (individualy for each transport)
          {
            // ResendErrorHandler with GeometryProgressionStrategy for "error processing"
            // Try resend 20 times from 10 sec interval to 3600 sec used geometry progression (denom 2) to calc next "wait" interval
            // For example: 10, 20, 40, 80, 160, ... 3600, 3600, ... - max to 20 times or success response
            errorHandler: new ResendErrorHandler(new GeometryProgressionStrategy(20, 10, 3600)),

            // ILeakyBucketService
            // For example send max 10 messages for each transports by one try
            leakyBucket: new LeakyBucketService(10),
            // Or max 8 messages for each transports in 15 sec
            // leakyBucket: new LeakyBucketService(8, 15),

            // Override processing interval (default: 3 sec)
            processingInterval: 5, // sec
          },
        );
      },
    },
  ],
})
export class AppModule {
  private notificationQueueManager: NotificationQueueManager;

  // Start NotificationQueueManager for queue processing
  constructor(service: NotificationService) {
    this.notificationQueueManager = new NotificationQueueManager(service).queueStart();
  }
}

Usage

import { IQueueEntity, IUserEntity } from '@notifications-system/core';
import { TRANSPORT_SMTP } from '@notifications-system/transport-mailer';
import { NotificationService } from './notification.service';

@Injectable()
export class SampleService {
  constructor(
    private readonly notificationService: NotificationService,
  ) {}

  sendEmail(recipient: IUserEntity | string, body: string, title?: string): Promise<IQueueEntity[]> {
    return this.notificationService.send(TRANSPORT_SMTP, { recipient, payload: title ? { title, body } : body });
  }
}