@omnihash/nestjs-azure-storage-queue v1.0.8
A NestJS module for seamless Azure Storage Queue integration with automatic message processing through decorators.
NOTE: This module is currently under development. Please do not use in production.
Features
⨠Decorator-based Queue Handlers - Use simple decorators to mark methods as queue message processors
š Automatic Message Polling - Built-in polling mechanism with configurable intervals
āļø Flexible Configuration - Support for both synchronous and asynchronous configuration
š”ļø Error Handling - Automatic retry logic with configurable dequeue limits
š Comprehensive Logging - Built-in logging for monitoring and debugging
š Auto-discovery - Automatically discovers and registers queue handlers at startup
š§© Type Safety - Support for typed messages to improve developer experience
Installation
npm install @omnihash/nestjs-azure-storage-queue @azure/storage-queue
# or
yarn add @omnihash/nestjs-azure-storage-queue @azure/storage-queue
Quick Start
1. Configure Environment
Create a .env
file in your project root:
AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=youraccount;AccountKey=yourkey;EndpointSuffix=core.windows.net
2. Setup Module
// app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { AzureStorageQueueModule } from '@omnihash/nestjs-azure-storage-queue';
import { MessageProcessorService } from './message-processor.service';
@Module({
imports: [
ConfigModule.forRoot(),
AzureStorageQueueModule.forRootAsync({
useFactory: (configService: ConfigService) => {
const connectionString = configService.get<string>(
'AZURE_STORAGE_CONNECTION_STRING',
);
if (!connectionString) {
throw new Error('AZURE_STORAGE_CONNECTION_STRING must be defined');
}
return {
connectionString,
defaultPollingInterval: 5000,
defaultVisibilityTimeout: 30,
defaultMaxDequeueCount: 5,
};
},
inject: [ConfigService],
}),
],
providers: [MessageProcessorService],
})
export class AppModule {}
3. Create Message Processors
// message-processor.service.ts
import { Injectable, Logger } from '@nestjs/common';
import {
AzureStorageQueueHandler,
AzureStorageQueueMessage,
} from '@omnihash/nestjs-azure-storage-queue';
// Define your custom message types for type safety
interface UserNotification {
userId: string;
message: string;
timestamp: string;
}
interface OrderData {
orderId: string;
customerName: string;
items: Array<{ id: string; quantity: number }>;
total: number;
}
@Injectable()
export class MessageProcessorService {
private readonly logger = new Logger(MessageProcessorService.name);
@AzureStorageQueueHandler({
queueName: 'user-notifications',
pollingInterval: 3000,
visibilityTimeout: 30,
maxDequeueCount: 1,
maxMessages: 10,
})
async handleUserNotifications(
message: AzureStorageQueueMessage<UserNotification>,
) {
this.logger.log(`Processing user notification: ${message.id}`);
// Strongly typed message body
const notification = message.body;
this.logger.log(
`Notification for user ${notification.userId}: ${notification.message}`,
);
// Simulate processing
await new Promise((resolve) => setTimeout(resolve, 1000));
this.logger.log(`Completed processing message: ${message.id}`);
}
@AzureStorageQueueHandler({
queueName: 'order-processing',
pollingInterval: 1000,
visibilityTimeout: 60,
maxMessages: 20,
maxDequeueCount: 10,
})
async handleOrderProcessing(message: AzureStorageQueueMessage<OrderData>) {
this.logger.log(`Processing order: ${message.id}`);
try {
// Strongly typed message body
const orderData = message.body;
// Full type safety with IDE intellisense
this.logger.log(
`Processing order ${orderData.orderId} for ${orderData.customerName}`,
);
this.logger.log(`Order contains ${orderData.items.length} items`);
this.logger.log(`Total amount: $${orderData.total}`);
// Process order logic here
this.logger.log(`Order processed successfully: ${orderData.orderId}`);
} catch (error) {
this.logger.error(`Failed to process order: ${error.message}`);
throw error; // This will cause the message to be retried
}
}
}
Configuration
Module Configuration Options
Option | Type | Description | Default |
---|---|---|---|
connectionString | string | Azure Storage connection string | required |
defaultPollingInterval | number | Default polling interval in milliseconds | 5000 |
defaultVisibilityTimeout | number | Default message visibility timeout in seconds | 30 |
defaultMaxDequeueCount | number | Default maximum dequeue count before poison queue | 5 |
Queue Handler Options
Option | Type | Description | Default |
---|---|---|---|
queueName | string | Name of the Azure Storage Queue | required |
pollingInterval | number | Polling interval in milliseconds | Module default |
visibilityTimeout | number | Message visibility timeout in seconds | Module default |
maxMessages | number | Maximum messages to retrieve per poll | 1 |
maxDequeueCount | number | Maximum dequeue count before deletion | Module default |
Advanced Usage
Static Configuration
// app.module.ts
import { AzureStorageQueueModule } from '@omnihash/nestjs-azure-storage-queue';
@Module({
imports: [
AzureStorageQueueModule.forRoot({
connectionString: 'your-connection-string',
defaultPollingInterval: 3000,
defaultVisibilityTimeout: 45,
defaultMaxDequeueCount: 3,
}),
],
})
export class AppModule {}
Sending Messages
import { Injectable } from '@nestjs/common';
import { AzureStorageQueueService } from '@omnihash/nestjs-azure-storage-queue';
// Define your message types
interface UserNotification {
userId: string;
message: string;
timestamp: string;
}
interface OrderUpdate {
orderId: string;
status: string;
updatedAt: string;
}
@Injectable()
export class NotificationService {
constructor(private readonly queueService: AzureStorageQueueService) {}
async sendNotification(userId: string, message: string) {
// Create a strongly-typed message
const notification: UserNotification = {
userId,
message,
timestamp: new Date().toISOString(),
};
// Send typed message - automatic serialization
await this.queueService.sendMessage<UserNotification>(
'user-notifications',
notification,
);
}
async sendOrderUpdate(orderId: string, status: string) {
// Create a strongly-typed message
const update: OrderUpdate = {
orderId,
status,
updatedAt: new Date().toISOString(),
};
// Send typed message - automatic serialization
await this.queueService.sendMessage<OrderUpdate>(
'order-processing',
update,
);
}
// You can still send simple string messages
async sendSimpleMessage(queueName: string, text: string) {
await this.queueService.sendMessage(queueName, text);
}
}
Multiple Queue Handlers with Type Safety
import { Injectable, Logger } from '@nestjs/common';
import { AzureStorageQueueHandler, AzureStorageQueueMessage } from '@omnihash/nestjs-azure-storage-queue';
@Injectable()
export class MultiQueueProcessor {
private readonly logger = new Logger(MultiQueueProcessor.name);
// High priority tasks with custom types
interface HighPriorityTask {
taskId: string;
priority: number;
data: Record<string, unknown>;
}
@AzureStorageQueueHandler({
queueName: 'high-priority',
pollingInterval: 1000,
maxMessages: 5
})
async handleHighPriority(message: AzureStorageQueueMessage<HighPriorityTask>) {
const task = message.body;
this.logger.log(`High priority task ${task.taskId} with priority ${task.priority}`);
// Handle high priority messages with type safety
}
// Simple string messages for low priority queue
@AzureStorageQueueHandler({
queueName: 'low-priority',
pollingInterval: 10000,
maxMessages: 10,
})
async handleLowPriority(message: AzureStorageQueueMessage<string>) {
this.logger.log(`Low priority message: ${message.body}`);
// Handle simple string messages
}
// Batch processing with complex objects
interface BatchItem {
items: Array<{ id: string; action: string }>;
batchId: string;
processingOptions: {
parallel: boolean;
timeout: number;
};
}
@AzureStorageQueueHandler({
queueName: 'batch-processing',
pollingInterval: 5000,
maxMessages: 32, // Azure Storage Queue max
visibilityTimeout: 120
})
async handleBatchProcessing(message: AzureStorageQueueMessage<BatchItem>) {
const batch = message.body;
this.logger.log(`Processing batch ${batch.batchId} with ${batch.items.length} items`);
// Type-safe access to all properties
if (batch.processingOptions.parallel) {
this.logger.log(`Processing items in parallel with ${batch.processingOptions.timeout}ms timeout`);
}
// Process batch items
for (const item of batch.items) {
this.logger.log(`- Item ${item.id}: ${item.action}`);
}
}
}
Message Format
Messages received by your handlers will have the following structure:
interface AzureStorageQueueMessage<T = string> {
id: string; // Message ID
body: T; // Message content with type parameter
dequeueCount: number; // Number of times dequeued
insertedOn: Date; // When message was inserted
expiresOn: Date; // When message expires
}
The generic type T
allows you to specify the type of the message body. It defaults to string
if not specified.
Error Handling
Automatic Retry Logic
When a handler throws an error, the message becomes visible again after the visibilityTimeout
and will be retried. If a message exceeds maxDequeueCount
, it will be automatically deleted (poison message handling).
Custom Error Handling
@AzureStorageQueueHandler({
queueName: 'error-prone-queue',
maxDequeueCount: 3,
})
async handleWithErrors(message: AzureStorageQueueMessage) {
try {
// Process message
await this.processMessage(message.body);
} catch (error) {
this.logger.error(`Processing failed: ${error.message}`, {
messageId: message.id,
dequeueCount: message.dequeueCount,
});
if (message.dequeueCount >= 2) {
// Send to dead letter queue or alert
await this.handlePoisonMessage(message);
}
throw error; // Re-throw to trigger retry
}
}
Schema Validation
You can combine typed messages with schema validation libraries for runtime safety:
import { z } from 'zod';
import { validateOrReject } from 'class-validator';
import { plainToInstance } from 'class-transformer';
import { AzureStorageQueueHandler, AzureStorageQueueMessage } from '@omnihash/nestjs-azure-storage-queue';
// Option 1: Using Zod
const UserSchema = z.object({
userId: z.string(),
email: z.string().email(),
preferences: z.object({
notifications: z.boolean(),
theme: z.enum(['light', 'dark']),
}),
});
type User = z.infer<typeof UserSchema>;
@AzureStorageQueueHandler({
queueName: 'user-updates',
})
async handleUserUpdates(message: AzureStorageQueueMessage<User>) {
try {
// Validate at runtime
UserSchema.parse(message.body);
// Process validated message
await this.processUserUpdate(message.body);
} catch (error) {
this.logger.error(`Invalid message schema: ${error.message}`);
// Handle invalid schema (do not retry)
}
}
// Option 2: Using class-validator
class OrderDto {
@IsString()
orderId: string;
@IsNumber()
amount: number;
@IsArray()
@ValidateNested({ each: true })
items: OrderItemDto[];
}
@AzureStorageQueueHandler({
queueName: 'orders',
})
async handleOrder(message: AzureStorageQueueMessage<OrderDto>) {
try {
// Transform plain object to class instance
const orderDto = plainToInstance(OrderDto, message.body);
// Validate the instance
await validateOrReject(orderDto);
// Process validated message
await this.processOrder(orderDto);
} catch (errors) {
this.logger.error(`Validation failed:`, errors);
// Handle invalid data
}
}
Best Practices
- Queue Naming: Use descriptive, kebab-case names for queues
- Message Size: Keep messages under 64KB (Azure Storage Queue limit)
- Idempotency: Design handlers to be idempotent in case of retries
- Monitoring: Use the built-in logging to monitor queue processing
- Error Handling: Implement proper error handling and poison message detection
- Resource Cleanup: The module automatically cleans up polling intervals on shutdown
- Type Definitions: Define TypeScript interfaces for all your message types
- Schema Validation: Combine type hints with runtime validation for maximum safety
API Reference
AzureStorageQueueModule
Static Methods
forRoot(config: AzureStorageQueueConfig): DynamicModule
Configure the module with static configurationforRootAsync(options: AsyncModuleOptions): DynamicModule
Configure the module with dynamic configuration using factories
AzureStorageQueueService
Methods
sendMessage<T = string>(queueName: string, message: T): Promise<void>
Send a typed message to the specified queuecreateQueueIfNotExists(queueName: string): Promise<QueueClient>
Create a queue if it doesn't exist and return the clientstartPolling<T = string>(options: AzureStorageQueuePollingOptions, handler: (message: AzureStorageQueueMessage<T>) => Promise<void>): Promise<void>
Start polling a queue (used internally by the decorator)stopPolling(queueName: string): void
Stop polling a specific queue
@AzureStorageQueueHandler
Decorator to mark methods as queue message handlers.
@AzureStorageQueueHandler(options: AzureStorageQueuePollingOptions)
Use this decorator on methods that receive AzureStorageQueueMessage<T>
as their parameter.
Troubleshooting
Common Issues
Connection String Invalid
Error: AZURE_STORAGE_CONNECTION_STRING must be defined
Ensure your connection string is properly set in environment variables.
Queue Not Found Queues are automatically created when first accessed. Ensure your Azure Storage account has the necessary permissions.
Messages Not Processing Check that:
- Your handler methods are in services registered with NestJS
- The
@AzureStorageQueueHandler
decorator is properly applied - Queue names match exactly (case-sensitive)
Debug Logging
Enable debug logging to see module activity:
// main.ts
import { Logger } from '@nestjs/common';
const app = await NestFactory.create(AppModule, {
logger: ['error', 'warn', 'log', 'debug'],
});
License
MIT
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.