bluedot-event-subscriber v0.0.2
Bluedot Event Subscriber library
this library will continuously poll SQS for new messages and emits them as events using NodeJS' EventEmitter Class. You can process them however you like.
Prerequisites
Install NodeJs
brew install node
Install Typescript
npm install typescript -g
Install Typescript lint
npm install tslint
Install dependencies:
npm install
Tests
Includes
- Joi (https://github.com/hapijs/joi) for object schema validation and
- Rosie (https://github.com/rosiejs/rosie) for test events generation
To run:
npm test
To check coverage:
npm run test:coverage-check
Build Steps
This starter kit uses Rollup (https://rollupjs.org/guide/en) for module bundling
Build library. NOTE: Update test coverage requirements
npm run build
This will build the
Publish library
npm publish
This will publish a CommonJS module and a ES module in the ./dist
directory
#Library behaviour:
Credentials
The library will look for credentials in the usual locations specified by AWS: https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/configuring-the-jssdk.html#Setting_AWS_Credentials
The easiest way is to set it in the envVars:
AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=
Default behaviours
- The queue is polled continuously using
long polling
. You can disable long polling by setting the optionswaitTimeSeconds
to0
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html - Messages will be deleted from the queue if
messageHandler
succeeds without error. messageHandler
is asynchronous. If you reject with error (or throw), the message will not be deleted from SQS- To process messages in parallel, set
batchSize
to a number between2
and10
Options
Create a new SQS Consumer:
new Subscriber(options)
required options:
region
- AWS regionsqsUrl
- URL of your SQS queue. This will be providedsnsArn
- ARN of your SNS Topic. This will be providedsqsArn
- ARN of yor SQS queue. This will be providedmessageHandler
- Function to handle the message. Will receive a Javascript Object as it's first argument
non-required options:
AttributeNames — (Array) A list of s that need to be returned along with each message. These attributes include:
- All - Returns all values.
- ApproximateFirstReceiveTimestamp - Returns the time the message was first received from the queue (epoch time in milliseconds).
- ApproximateReceiveCount - Returns the number of times a message has been received from the queue but not deleted.
- SenderId
- For an IAM user, returns the IAM user ID, for example ABCDEFGHI1JKLMNOPQ23R.
- For an IAM role, returns the IAM role ID, for example ABCDE1F2GH3I4JK5LMNOP:i-a123b456.
- SentTimestamp - Returns the time the message was sent to the queue (epoch time in milliseconds).
- MessageDeduplicationId - Returns the value provided by the producer that calls the SendMessage action.
- MessageGroupId - Returns the value provided by the producer that calls the SendMessage action. Messages with the same MessageGroupId are returned in sequence.
- SequenceNumber - Returns the value provided by Amazon SQS.
MessageAttributeNames — (Array)
- The name of the message attribute, where N is the index.
- The name can contain alphanumeric characters and the underscore (_), hyphen (-), and period (.).
- The name is case-sensitive and must be unique among all attribute names for the message.
- The name must not start with AWS-reserved prefixes such as AWS. or Amazon. (or any casing variants).
- The name must not start or end with a period (.), and it should not have periods in succession (..).
- The name can be up to 256 characters long.
- When using ReceiveMessage, you can send a list of attribute names to receive, or you can return all of the attributes by specifying All or . in your request. You can also use all message attributes starting with a prefix, for example bar..
- The name of the message attribute, where N is the index.
MaxNumberOfMessages — (Integer)
- The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1.
VisibilityTimeout — (Integer)
- The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
WaitTimeSeconds — (Integer)
- The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than WaitTimeSeconds. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
Library Usage
CommonJS:
const Test = require('typescript-library-starter-kit').Test
Test.test(payload)
ES:
import { Test } from 'typescript-library-starter-kit'
Test.test(payload)
Return Object Structure:
{
"Type": "Notification",
"MessageId": "9e5d7ae3-6de5-5526-8333-2e8731158baa",
"TopicArn": "arn:aws:sns:ap-southeast-2:791031460737:bluedot-config-events-dev-1",
"Message": "{\"referenceId\":\"0d80f8c4-b725-4bde-8347-3967bf30c01c\",\"customerEmail\":\"hello@bluedotinnovation.com\",\"customerId\":\"436953cd-c0e5-43fb-851a-72a86a9f0da9\",\"actionName\":\"zoneUpdate\",\"creationTime\":\"2018-10-21T23:37:37.993+0000\"}",
"Timestamp": "2018-11-13T05:01:36.304Z",
"SignatureVersion": "1",
"Signature": "Gs16+10GeN/0yt7AiKEwHQ8oq/oU3RzlOd924AeWpP4Qs6OIiNPin9NVEGKLKnCfUvF+6kX/vRQxhUo1wCNsDyigRZGsGPATqr/WVNoGSIRNXHnpAxmxuxP8oiSzh+uvAUIpSJKTmg7Jt9AdA4iAb8/ijr0qLKMHFUslbpPUhVhNT0c4ZJAmThimyZryHPX/JkrRRo3HxCVnv4bf43i6XNFyi5T1Z58P+zKIDulCHaIN0wzmDLDfbnp4m24ODjJIjKm7Q+6Iwd1pbFmOZMeXfUOdBZd4x49KuMyYvwPPAEmTXYrnZe7soWUrLv0W9Ct5DwW/bH+jZgrLIIiPRkBqgA==",
"SigningCertURL": "https://sns.ap-southeast-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem",
"UnsubscribeURL": "https://sns.ap-southeast-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-southeast-2:791031460737:bluedot-config-events-dev-1:7528f821-0d59-4163-95f9-cf58f149b4c1"
}
Sample Implementations:
Premade Queues
require('dotenv').config()
import { ISubscriberOptions, Subscriber } from './src/Subscriber'
(async () => {
const options = {} as ISubscriberOptions
if (!process.env.SNS_ARN || !process.env.SQS_ARN || !process.env.SQS_URL || !process.env.REGION){
throw new Error('Environmental variables not set')
} else {
options.snsArn = process.env.SNS_ARN
options.sqsArn = process.env.SQS_ARN
options.sqsUrl = process.env.SQS_URL
options.region = process.env.REGION
options.batchSize = 3
}
const handler = (messageObj) => {
console.log(`Logging from messageHandler: ${JSON.stringify(messageObj)}`)
}
options.messageHandler = handler
try {
const subscriber = new Subscriber(options)
subscriber.on('messages processed', () => {
console.log('EVENT: messages processed')
})
subscriber.on('processing message', (message) => {
console.log(`EVENT: processing message: ${JSON.stringify(message)}`)
})
subscriber.on('processing error', (err, message) => {
console.log(`EVENT: processing error ${err.message}`)
})
subscriber.on('error', (err, message) => {
console.log(`EVENT: error ${err.message}`)
})
subscriber.on('stopped', () => {
console.log(`EVENT: stopped `)
})
await subscriber.subscribe()
} catch (err) {
console.log(`err: ${JSON.stringify(err)}`)
}
})()
Generated queue:
require('dotenv').config()
import { ISubscriberOptions, Subscriber } from './src/Subscriber'
(async () => {
const options = {} as ISubscriberOptions
if (!process.env.SNS_ARN || !process.env.REGION || !process.env.NEW_SQS_NAME) {
throw new Error('Environmental variables not set')
} else {
options.snsArn = process.env.SNS_ARN
// options.sqsArn = process.env.SQS_ARN
// options.sqsUrl = process.env.SQS_URL
options.newSqsName = process.env.NEW_SQS_NAME
options.region = process.env.REGION
options.batchSize = 3
}
const handler = (messageObj) => {
console.log(`Logging from messageHandler: ${JSON.stringify(messageObj)}`)
}
options.messageHandler = handler
try {
const subscriber = new Subscriber(options)
subscriber.on('messages processed', () => {
console.log('EVENT: messages processed')
})
subscriber.on('processing message', (message) => {
console.log(`EVENT: processing message: ${JSON.stringify(message)}`)
})
subscriber.on('processing error', (err, message) => {
console.log(`EVENT: processing error ${err.message}`)
})
subscriber.on('error', (err, message) => {
console.log(`EVENT: error ${err.message}`)
})
subscriber.on('stopped', () => {
console.log(`EVENT: stopped `)
})
await subscriber.subscribe()
// await subscriber.createQueue()
} catch (err) {
console.log(`err: ${JSON.stringify(err)}`)
}
})()
License
This project is released under The Unlicense, your free to copy this and do what you like.
5 years ago