1.0.3-dev.0 • Published 7 months ago

go-sqs-nestjs v1.0.3-dev.0

Weekly downloads
-
License
Unlicense
Repository
bitbucket
Last release
7 months ago

NestJS SQS Wrapper

A robust and easy-to-use AWS SQS wrapper for NestJS applications. This package simplifies interaction with Amazon SQS by providing a seamless integration with NestJS, allowing you to send, receive, and delete messages effortlessly. It also supports setting up message consumers using decorators for streamlined message handling.

Features

  • Send Messages: Easily send messages to your SQS queues.
  • Receive Messages: Fetch messages from your queues with customizable options.
  • Delete Messages: Safely delete processed messages to prevent reprocessing.
  • Consumer Handlers: Define message consumers using decorators for automatic message handling.
  • Cron Integration: Periodically poll queues using cron jobs.
  • Debug Mode: Enable detailed logging for easier debugging.

Installation

Install the package via npm:

npm install go-sqs-nestjs

Or using yarn:

yarn add go-sqs-nestjs

Peer Dependencies

Ensure you have the following peer dependencies installed:

  • @nestjs/common
  • @nestjs/core
  • @aws-sdk/client-sqs
  • @golevelup/nestjs-discovery
  • node-cron

You can install them using:

npm install @nestjs/common @nestjs/core @aws-sdk/client-sqs @golevelup/nestjs-discovery node-cron

Usage

1. Importing the Module

First, import the SQSModule into your application's root module or any other module where you want to use SQS services.

import { Module } from '@nestjs/common';
import { SQSModule } from 'nestjs-sqs-wrapper';
import { SQSService } from './sqs.service'; // Adjust the path accordingly
import { YourService } from './your.service'; // Service where you use SQS

@Module({
  imports: [
    SQSModule.register({
      debugMode: true, // Optional: Enable debug mode
      queues: [
        {
          name: 'your-queue-name',
          queueUrl:
            'https://sqs.your-region.amazonaws.com/your-account-id/your-queue-name',
          client: new SQSClient({ region: 'your-region' }),
          consumerOptions: {
            pullIntervalInSeconds: 10,
            batchSize: 5,
            visibilityTimeoutInSeconds: 30,
            deleteAfter: true,
            preventDeleteOnError: false,
          },
        },
        // Add more queues as needed
      ],
    }),
  ],
  providers: [YourService],
})
export class AppModule {}

2. Sending a Message

Use the SQSService to send messages to your SQS queues.

import { Injectable } from '@nestjs/common';
import { SQSService } from 'nestjs-sqs-wrapper';

@Injectable()
export class YourService {
  constructor(private readonly sqsService: SQSService) {}

  async sendMessage() {
    try {
      const result = await this.sqsService.sendMessage(
        'your-queue-name',
        'Your message body',
        {
          DelaySeconds: 10, // Optional: Delay the message by 10 seconds
        },
      );
      console.log('Message sent:', result.MessageId);
    } catch (error) {
      console.error('Error sending message:', error);
    }
  }
}

3. Receiving a Message

Receiving messages is handled automatically by consumers (see Consumer Handlers below). However, you can manually receive messages if needed.

import { Injectable } from '@nestjs/common';
import { SQSService } from 'nestjs-sqs-wrapper';

@Injectable()
export class YourService {
  constructor(private readonly sqsService: SQSService) {}

  async receiveMessages() {
    try {
      const messages = await this.sqsService.receiveMessage('your-queue-name', {
        MaxNumberOfMessages: 10,
        VisibilityTimeout: 30,
      });
      if (messages.Messages) {
        messages.Messages.forEach((message) => {
          console.log('Received message:', message.Body);
          // Process the message
        });
      }
    } catch (error) {
      console.error('Error receiving messages:', error);
    }
  }
}

4. Deleting a Message

After processing a message, delete it to prevent it from being reprocessed.

import { Injectable } from '@nestjs/common';
import { SQSService } from 'nestjs-sqs-wrapper';

@Injectable()
export class YourService {
  constructor(private readonly sqsService: SQSService) {}

  async deleteMessage(queueName: string, receiptHandle: string) {
    try {
      await this.sqsService.deleteMessage(queueName, receiptHandle);
      console.log('Message deleted successfully');
    } catch (error) {
      console.error('Error deleting message:', error);
    }
  }
}

5. Setting Up Consumer Handlers

Define consumer handlers using the @QueueHandler decorator to automatically process incoming messages.

import { Injectable } from '@nestjs/common';
import { QueueHandler } from 'nestjs-sqs-wrapper';
import { Message } from '@aws-sdk/client-sqs';

@Injectable()
export class MessageConsumer {
  @QueueHandler('your-queue-name')
  async handleMessage(message: Message): Promise<void> {
    try {
      console.log('Processing message:', message.Body);
      // Your message processing logic here

      // Optionally, you can manually delete the message if not using automatic deletion
      // await this.sqsService.deleteMessage('your-queue-name', message.ReceiptHandle);
    } catch (error) {
      console.error('Error processing message:', error);
      // If preventDeleteOnError is true, the message won't be deleted automatically
    }
  }
}

6. Cron Job Configuration

The package uses cron jobs to poll messages from SQS queues based on the pullIntervalInSeconds option. Ensure that the CronService is properly configured in your module (already handled by the SQSModule).

API Reference

SQSModule.register(options: SQSModuleOptions): DynamicModule

Registers the SQS module with the provided options.

  • options: SQSModuleOptions
    • debugMode?: boolean - Enable debug logging.
    • queues: Array<{ name: string; queueUrl: string; client: SQSClient; consumerOptions?: ConsumerOptions }> - Define your SQS queues.

SQSService

Provides methods to interact with SQS queues.

  • sendMessage(queueName: string, messageBody: string, options?: Omit<SendMessageCommandInput, 'QueueUrl' | 'MessageBody'>): Promise
    • Sends a message to the specified queue.
  • receiveMessage(queueName: string, options?: Omit<ReceiveMessageCommandInput, 'QueueUrl'>): Promise
    • Receives messages from the specified queue.
  • deleteMessage(queueName: string, receiptHandle: string, options?: Omit<DeleteMessageCommandInput, 'QueueUrl'>): Promise
    • Deletes a message from the specified queue.

@QueueHandler(queueName: string): MethodDecorator

Decorator to mark a method as a consumer handler for a specific SQS queue.

  • queueName: The name of the queue to handle messages from.