0.4.0 • Published 10 months ago

@janiscommerce/sqs-consumer v0.4.0

Weekly downloads
-
License
ISC
Repository
github
Last release
10 months ago

SQS Consumer

Build Status Coverage Status npm version

A wrapper for SQS Consumers and Lambda

:inbox_tray: Installation

npm install @janiscommerce/sqs-consumer

:hammer: Usage

Your business logic must be implemented as a SQSConsumer. There are two types of consumers exported for implementation ease:

BatchSQSConsumer

This consumer processes all the records in one single call. This is useful for example when you want to fetch some data using some field on every record.

For this consumer, you have to implement the processBatch method. The method signature is the following:

processBatch(records: Array<ParsedSQSRecordWithLogger>): Promise<void> | void

To add a log message for a record, you can use the built-in logger like this:

record[Symbol.for('logger')].info('Some info message');

Logger has been implemented as a Symbol property to ensure that it won't collide with existing properties of the SQS Record

Logging levels follow lllog levels.

IterativeSQSConsumer

This consumer processes one record at a time. This is useful when records are completely unrelated and can be processed in parallel with no dependencies between them, or when you consume only one SQS message per invocation (batchSize = 1).

For this consumer, you have to implement the processSingleRecord method. The method signature is the following:

processSingleRecord(record: ParsedSQSRecord, logger: LogTransport): Promise<void> | void

To add a log message for a record, you can use the logger passed as argument like this:

logger.info('Some info message');

Logging levels follow lllog levels.

Partial failure reporting

To implement Partial failure reporting, you should add each message ID that fails using the method addFailedMessage(messageId).

The lambda will automatically return the failed messages formatted as expected.

:zap: Usage with serverless (lambda)

This package also exports a SQSHandler to easily integrate with AWS Lambda.

Usage is as easy as it can be, just export the following in your lambda:

module.exports.handler = event => SQSHandler.handle(MySQSConsumer, event);

:warning: Advanced usage

Conditional processing

In case you want to process the messages in batch in some cases and individually in others, you can extend the handlesBatch method to implement your own custom logic. The method's signature is the following:

handlesBatch(event: SQSEvent): boolean

Important: This method must be synchronous

Message formatting

This package expects each message body to be a JSON string and will fail if it's not.

In case you want to parse the records in a different way (or silently fail if format is invalid) you can override the parseRecord method. The method's signature is the following:

parseRecord(record: SQSRecord): ParsedSQSRecord

Important: This method must be synchronous

:computer: Examples

Lambda Batch consumer

Process a batch of new ratings of a product and save them as not-verified

const {
	SQSHandler,
	BatchSQSConsumer
} = require('@janiscommerce/sqs-consumer');

const DbHandler = require('./your-db-handler');

class MyBatchConsumer extends BatchSQSConsumer {

	async processBatch(records) {

		const ratings = records.map(({ body }) => ({
			rating: body.rating,
			verified: false
		}));

		return DbHandler.insertMany(ratings);
	}
}

module.exports.handler = event => SQSHandler.handle(MyBatchConsumer, event);

Lambda Iterative consumer

Process a batch of orders placed in you ecommerce and send an email for each of them

const {
	SQSHandler,
	IterativeSQSConsumer
} = require('@janiscommerce/sqs-consumer');

const MailingService = require('./your-mailing-service');

class MyIterativeConsumer extends IterativeSQSConsumer {

	async processSingleRecord(record, logger) {

		const { body: orderPlaced } = record.body;

		logger.info(`Sending email for order ${orderPlaced.id}`);

		return MailingService.sendTemplate('orderPlaced', orderPlaced);
	}
}

module.exports.handler = event => SQSHandler.handle(MyIterativeConsumer, event);

Validate with Struct (Optional)

When you declare a struct, before any process, all records are validated and only continue if pass the validation, this validations should return a valid struct.

You must declare a get struct() in your class.

const {
	SQSHandler,
	IterativeSQSConsumer
} = require('@janiscommerce/sqs-consumer');
const { struct } = require('@janiscommerce/superstruct');

class MyConsumer extends IterativeSQSConsumer {

	get struct() {
		return struct.partial({
			name: 'string'
		});
	}

}

module.exports.handler = event => SQSHandler.handle(MyConsumer, event);

Session injection

This package implements API Session. In order to associate a request to a session, the record should be contain the property janis-client in the messageAttributes.

In case the messageAttribute is set, you can access the session in your Consumer as this.session. Otherwise, this.session will be undefined.

Session details and customization details can be found in api-session README.