@driimus/lambda-batch-processor v0.2.0
@driimus/lambda-batch-processor
Concurrently process batch records with partial failure support.
Installation
!WARNING This is an ES only package. Before installing, make sure that your project's configuration supports ECMAScript modules.
pnpm add @driimus/lambda-batch-processor
Type hints
For types to work as expected, @types/aws-lambda
must be installed:
pnpm add --save-dev @types/aws-lambda
Usage
!WARNING >
ReportBatchItemFailures
must be enabled to allow retrying failed messages.
import { SQSBatchProcessor } from '@driimus/lambda-batch-processor';
const processor = new SQSBatchProcessor(async (record) => {
/** do stuff */
});
export const handler = processor.process;
Supported event sources:
DynamoDB Streams
import { DynamoDBBatchProcessor } from '@driimus/lambda-batch-processor';
Kinesis Data Streams
import { KinesisBatchProcessor } from '@driimus/lambda-batch-processor';
SQS
import { SQSBatchProcessor } from '@driimus/lambda-batch-processor';
Non-retryable errors
Exceptions that occur during batch processing can be treated as permanent failures.
This feature is inspired from the AWS Lambda Powertools for Java, with one key difference:
By default, messages that trigger permanent failures will not be reported.
In the case of SQS messages, the result will be their deletion from the queue.
To send SQS messages to a dead-letter queue, you can use @driimus/sqs-permanent-failure-dlq
.
Logging
You can enable logging by providing a logger compatible with the Logger
interface,
which is modelled after pino's function signatures.
!NOTE The provided logger should support serialising AggregateError objects.
If using pino, it might be worth adding pino-lambda, to preserve Lambda's standard log message format.
import { SQSBatchProcessor } from '@driimus/lambda-batch-processor';
import pino from 'pino';
import { lambdaRequestTracker, pinoLambdaDestination } from 'pino-lambda';
const destination = pinoLambdaDestination();
const withRequest = lambdaRequestTracker();
const logger = pino({}, destination);
const processor = new SQSBatchProcessor(
async (record) => {
/** do stuff */
},
{
logger,
},
);
export const handler = async (event, context) => {
withRequest(event, context);
return await processor.process(event, context);
};