0.0.2 • Published 5 years ago

bluedot-event-subscriber v0.0.2

Weekly downloads
3
License
UNLICENSED
Repository
gitlab
Last release
5 years ago

Bluedot Event Subscriber library

this library will continuously poll SQS for new messages and emits them as events using NodeJS' EventEmitter Class. You can process them however you like.

Prerequisites

Install NodeJs

brew install node

Install Typescript

npm install typescript -g

Install Typescript lint

npm install tslint

Install dependencies:

npm install

Tests

Includes

To run:

npm test

To check coverage:

npm run test:coverage-check

Build Steps

This starter kit uses Rollup (https://rollupjs.org/guide/en) for module bundling

Build library. NOTE: Update test coverage requirements

npm run build

This will build the

Publish library

npm publish

This will publish a CommonJS module and a ES module in the ./dist directory #Library behaviour:

Credentials

The library will look for credentials in the usual locations specified by AWS: https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/configuring-the-jssdk.html#Setting_AWS_Credentials

The easiest way is to set it in the envVars:

AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=

Default behaviours

  • The queue is polled continuously using long polling. You can disable long polling by setting the options waitTimeSeconds to 0 https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html
  • Messages will be deleted from the queue if messageHandler succeeds without error.
  • messageHandler is asynchronous. If you reject with error (or throw), the message will not be deleted from SQS
  • To process messages in parallel, set batchSize to a number between 2 and 10

Options

Create a new SQS Consumer:

new Subscriber(options)

required options:

  • region - AWS region
  • sqsUrl - URL of your SQS queue. This will be provided
  • snsArn - ARN of your SNS Topic. This will be provided
  • sqsArn - ARN of yor SQS queue. This will be provided
  • messageHandler - Function to handle the message. Will receive a Javascript Object as it's first argument

non-required options:

  • AttributeNames — (Array) A list of s that need to be returned along with each message. These attributes include:

    • All - Returns all values.
    • ApproximateFirstReceiveTimestamp - Returns the time the message was first received from the queue (epoch time in milliseconds).
    • ApproximateReceiveCount - Returns the number of times a message has been received from the queue but not deleted.
    • SenderId
      • For an IAM user, returns the IAM user ID, for example ABCDEFGHI1JKLMNOPQ23R.
      • For an IAM role, returns the IAM role ID, for example ABCDE1F2GH3I4JK5LMNOP:i-a123b456.
    • SentTimestamp - Returns the time the message was sent to the queue (epoch time in milliseconds).
    • MessageDeduplicationId - Returns the value provided by the producer that calls the SendMessage action.
    • MessageGroupId - Returns the value provided by the producer that calls the SendMessage action. Messages with the same MessageGroupId are returned in sequence.
    • SequenceNumber - Returns the value provided by Amazon SQS.
  • MessageAttributeNames — (Array)

    • The name of the message attribute, where N is the index.
      • The name can contain alphanumeric characters and the underscore (_), hyphen (-), and period (.).
      • The name is case-sensitive and must be unique among all attribute names for the message.
      • The name must not start with AWS-reserved prefixes such as AWS. or Amazon. (or any casing variants).
      • The name must not start or end with a period (.), and it should not have periods in succession (..).
      • The name can be up to 256 characters long.
      • When using ReceiveMessage, you can send a list of attribute names to receive, or you can return all of the attributes by specifying All or . in your request. You can also use all message attributes starting with a prefix, for example bar..
  • MaxNumberOfMessages — (Integer)

    • The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1.
  • VisibilityTimeout — (Integer)

    • The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
  • WaitTimeSeconds — (Integer)

    • The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than WaitTimeSeconds. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.

Library Usage

CommonJS:

const Test = require('typescript-library-starter-kit').Test

Test.test(payload)

ES:

import { Test } from 'typescript-library-starter-kit'

Test.test(payload)

Return Object Structure:

{
  "Type": "Notification",
  "MessageId": "9e5d7ae3-6de5-5526-8333-2e8731158baa",
  "TopicArn": "arn:aws:sns:ap-southeast-2:791031460737:bluedot-config-events-dev-1",
  "Message": "{\"referenceId\":\"0d80f8c4-b725-4bde-8347-3967bf30c01c\",\"customerEmail\":\"hello@bluedotinnovation.com\",\"customerId\":\"436953cd-c0e5-43fb-851a-72a86a9f0da9\",\"actionName\":\"zoneUpdate\",\"creationTime\":\"2018-10-21T23:37:37.993+0000\"}",
  "Timestamp": "2018-11-13T05:01:36.304Z",
  "SignatureVersion": "1",
  "Signature": "Gs16+10GeN/0yt7AiKEwHQ8oq/oU3RzlOd924AeWpP4Qs6OIiNPin9NVEGKLKnCfUvF+6kX/vRQxhUo1wCNsDyigRZGsGPATqr/WVNoGSIRNXHnpAxmxuxP8oiSzh+uvAUIpSJKTmg7Jt9AdA4iAb8/ijr0qLKMHFUslbpPUhVhNT0c4ZJAmThimyZryHPX/JkrRRo3HxCVnv4bf43i6XNFyi5T1Z58P+zKIDulCHaIN0wzmDLDfbnp4m24ODjJIjKm7Q+6Iwd1pbFmOZMeXfUOdBZd4x49KuMyYvwPPAEmTXYrnZe7soWUrLv0W9Ct5DwW/bH+jZgrLIIiPRkBqgA==",
  "SigningCertURL": "https://sns.ap-southeast-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem",
  "UnsubscribeURL": "https://sns.ap-southeast-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-southeast-2:791031460737:bluedot-config-events-dev-1:7528f821-0d59-4163-95f9-cf58f149b4c1"
}

Sample Implementations:

Premade Queues

require('dotenv').config()

import { ISubscriberOptions, Subscriber } from './src/Subscriber'

(async () => {
    const options = {} as ISubscriberOptions

    if (!process.env.SNS_ARN || !process.env.SQS_ARN || !process.env.SQS_URL || !process.env.REGION){
        throw new Error('Environmental variables not set')
    } else {
        options.snsArn = process.env.SNS_ARN
        options.sqsArn = process.env.SQS_ARN
        options.sqsUrl = process.env.SQS_URL
        options.region = process.env.REGION
        options.batchSize = 3
    }

    const handler = (messageObj) => {
        console.log(`Logging from messageHandler: ${JSON.stringify(messageObj)}`)
    }

    options.messageHandler = handler

    try {
        const subscriber = new Subscriber(options)

        subscriber.on('messages processed', () => {
            console.log('EVENT: messages processed')
        })

        subscriber.on('processing message', (message) => {
            console.log(`EVENT: processing message: ${JSON.stringify(message)}`)
        })

        subscriber.on('processing error', (err, message) => {
            console.log(`EVENT: processing error ${err.message}`)
        })

        subscriber.on('error', (err, message) => {
            console.log(`EVENT: error ${err.message}`)
        })

        subscriber.on('stopped', () => {
            console.log(`EVENT: stopped `)
        })

        await subscriber.subscribe()
    } catch (err) {
        console.log(`err: ${JSON.stringify(err)}`)

    }

})()

Generated queue:

require('dotenv').config()

import { ISubscriberOptions, Subscriber } from './src/Subscriber'

(async () => {
    const options = {} as ISubscriberOptions

    if (!process.env.SNS_ARN || !process.env.REGION || !process.env.NEW_SQS_NAME) {
        throw new Error('Environmental variables not set')
    } else {
        options.snsArn = process.env.SNS_ARN
        // options.sqsArn = process.env.SQS_ARN
        // options.sqsUrl = process.env.SQS_URL
        options.newSqsName = process.env.NEW_SQS_NAME
        options.region = process.env.REGION
        options.batchSize = 3
    }

    const handler = (messageObj) => {
        console.log(`Logging from messageHandler: ${JSON.stringify(messageObj)}`)
    }

    options.messageHandler = handler

    try {
        const subscriber = new Subscriber(options)

        subscriber.on('messages processed', () => {
            console.log('EVENT: messages processed')
        })

        subscriber.on('processing message', (message) => {
            console.log(`EVENT: processing message: ${JSON.stringify(message)}`)
        })

        subscriber.on('processing error', (err, message) => {
            console.log(`EVENT: processing error ${err.message}`)
        })

        subscriber.on('error', (err, message) => {
            console.log(`EVENT: error ${err.message}`)
        })

        subscriber.on('stopped', () => {
            console.log(`EVENT: stopped `)
        })

        await subscriber.subscribe()
        // await subscriber.createQueue()
    } catch (err) {
        console.log(`err: ${JSON.stringify(err)}`)

    }

})()

License

This project is released under The Unlicense, your free to copy this and do what you like.

0.0.2

5 years ago