go-sqs-nestjs v1.0.3-dev.0
NestJS SQS Wrapper
A robust and easy-to-use AWS SQS wrapper for NestJS applications. This package simplifies interaction with Amazon SQS by providing a seamless integration with NestJS, allowing you to send, receive, and delete messages effortlessly. It also supports setting up message consumers using decorators for streamlined message handling.
Features
- Send Messages: Easily send messages to your SQS queues.
- Receive Messages: Fetch messages from your queues with customizable options.
- Delete Messages: Safely delete processed messages to prevent reprocessing.
- Consumer Handlers: Define message consumers using decorators for automatic message handling.
- Cron Integration: Periodically poll queues using cron jobs.
- Debug Mode: Enable detailed logging for easier debugging.
Installation
Install the package via npm:
npm install go-sqs-nestjs
Or using yarn:
yarn add go-sqs-nestjs
Peer Dependencies
Ensure you have the following peer dependencies installed:
@nestjs/common
@nestjs/core
@aws-sdk/client-sqs
@golevelup/nestjs-discovery
node-cron
You can install them using:
npm install @nestjs/common @nestjs/core @aws-sdk/client-sqs @golevelup/nestjs-discovery node-cron
Usage
1. Importing the Module
First, import the SQSModule
into your application's root module or any other module where you want to use SQS services.
import { Module } from '@nestjs/common';
import { SQSModule } from 'nestjs-sqs-wrapper';
import { SQSService } from './sqs.service'; // Adjust the path accordingly
import { YourService } from './your.service'; // Service where you use SQS
@Module({
imports: [
SQSModule.register({
debugMode: true, // Optional: Enable debug mode
queues: [
{
name: 'your-queue-name',
queueUrl:
'https://sqs.your-region.amazonaws.com/your-account-id/your-queue-name',
client: new SQSClient({ region: 'your-region' }),
consumerOptions: {
pullIntervalInSeconds: 10,
batchSize: 5,
visibilityTimeoutInSeconds: 30,
deleteAfter: true,
preventDeleteOnError: false,
},
},
// Add more queues as needed
],
}),
],
providers: [YourService],
})
export class AppModule {}
2. Sending a Message
Use the SQSService
to send messages to your SQS queues.
import { Injectable } from '@nestjs/common';
import { SQSService } from 'nestjs-sqs-wrapper';
@Injectable()
export class YourService {
constructor(private readonly sqsService: SQSService) {}
async sendMessage() {
try {
const result = await this.sqsService.sendMessage(
'your-queue-name',
'Your message body',
{
DelaySeconds: 10, // Optional: Delay the message by 10 seconds
},
);
console.log('Message sent:', result.MessageId);
} catch (error) {
console.error('Error sending message:', error);
}
}
}
3. Receiving a Message
Receiving messages is handled automatically by consumers (see Consumer Handlers below). However, you can manually receive messages if needed.
import { Injectable } from '@nestjs/common';
import { SQSService } from 'nestjs-sqs-wrapper';
@Injectable()
export class YourService {
constructor(private readonly sqsService: SQSService) {}
async receiveMessages() {
try {
const messages = await this.sqsService.receiveMessage('your-queue-name', {
MaxNumberOfMessages: 10,
VisibilityTimeout: 30,
});
if (messages.Messages) {
messages.Messages.forEach((message) => {
console.log('Received message:', message.Body);
// Process the message
});
}
} catch (error) {
console.error('Error receiving messages:', error);
}
}
}
4. Deleting a Message
After processing a message, delete it to prevent it from being reprocessed.
import { Injectable } from '@nestjs/common';
import { SQSService } from 'nestjs-sqs-wrapper';
@Injectable()
export class YourService {
constructor(private readonly sqsService: SQSService) {}
async deleteMessage(queueName: string, receiptHandle: string) {
try {
await this.sqsService.deleteMessage(queueName, receiptHandle);
console.log('Message deleted successfully');
} catch (error) {
console.error('Error deleting message:', error);
}
}
}
5. Setting Up Consumer Handlers
Define consumer handlers using the @QueueHandler
decorator to automatically process incoming messages.
import { Injectable } from '@nestjs/common';
import { QueueHandler } from 'nestjs-sqs-wrapper';
import { Message } from '@aws-sdk/client-sqs';
@Injectable()
export class MessageConsumer {
@QueueHandler('your-queue-name')
async handleMessage(message: Message): Promise<void> {
try {
console.log('Processing message:', message.Body);
// Your message processing logic here
// Optionally, you can manually delete the message if not using automatic deletion
// await this.sqsService.deleteMessage('your-queue-name', message.ReceiptHandle);
} catch (error) {
console.error('Error processing message:', error);
// If preventDeleteOnError is true, the message won't be deleted automatically
}
}
}
6. Cron Job Configuration
The package uses cron jobs to poll messages from SQS queues based on the pullIntervalInSeconds
option. Ensure that the CronService
is properly configured in your module (already handled by the SQSModule
).
API Reference
SQSModule.register(options: SQSModuleOptions): DynamicModule
Registers the SQS module with the provided options.
- options:
SQSModuleOptions
debugMode?: boolean
- Enable debug logging.queues: Array<{ name: string; queueUrl: string; client: SQSClient; consumerOptions?: ConsumerOptions }>
- Define your SQS queues.
SQSService
Provides methods to interact with SQS queues.
- sendMessage(queueName: string, messageBody: string, options?: Omit<SendMessageCommandInput, 'QueueUrl' | 'MessageBody'>): Promise
- Sends a message to the specified queue.
- receiveMessage(queueName: string, options?: Omit<ReceiveMessageCommandInput, 'QueueUrl'>): Promise
- Receives messages from the specified queue.
- deleteMessage(queueName: string, receiptHandle: string, options?: Omit<DeleteMessageCommandInput, 'QueueUrl'>): Promise
- Deletes a message from the specified queue.
@QueueHandler(queueName: string): MethodDecorator
Decorator to mark a method as a consumer handler for a specific SQS queue.
- queueName: The name of the queue to handle messages from.
8 months ago
8 months ago
7 months ago
8 months ago
8 months ago
1 year ago