1.3.0 • Published 5 years ago

wtsqs v1.3.0

Weekly downloads
Last release
5 years ago


npm version Build Status Coverage Status Dependencies Dev Dependencies

Simplified SQS Wrapper and Async Worker manager.


  • Simple interface. :white_check_mark:
  • Promise based. :white_check_mark:
  • ES6. :white_check_mark:
  • Optimized async worker. :white_check_mark:


# Using npm
$ npm install wtsqs --save

# Or using yarn
$ yarn add wtsqs




A simplified sqs wrapper with interface similar to a normal queue data structure.

Kind: global class

new WTSQS(options)

Constructs WTSQS object.

optionsObjectOptions object.
options.urlStringSQS queue url.
options.accessKeyIdStringAWS access key id.
options.secretAccessKeyStringAWS secret access key.
options.regionStringus-east-1AWS regions where queue exists.
options.defaultMessageGroupIdStringFIFO queues only. Default tag assigned to a message that specifies it belongs to a specific message group. If not provided random uuid is assigned to each message which doesn't guarantee order but allows parallelism.
options.defaultVisibilityTimeoutInteger60Default duration (in seconds) that the received messages are hidden from subsequent retrieve requests.
options.defaultPollWaitTimeInteger10Default duration (in seconds) for which read calls wait for a message to arrive in the queue before returning.
options.sqsOptionsObjectAdditional options to extend/override the underlying SQS object creation.


const { WTSQS } = require('wtsqs')

// The most simple way to construct a WTSQS object
const wtsqs = new WTSQS({
  url: '//queue-url',
  accessKeyId: 'AWS_ACCESS_KEY_ID',
  secretAccessKey: 'AWS_SECRET_ACCESS_KEY'

wtsqs.size() ⇒ Promise.<integer>

Get approximate total number of messages in the queue.

Kind: instance method of WTSQS

const size = await wtsqs.size()
console.log(size) // output: 2

wtsqs.enqueueOne(payload, options, sqsOptions) ⇒ Promise

Enqueue single payload in the queue.

Kind: instance method of WTSQS
See: SQS#sendMessage

payloadObjectJSON serializable object.
options.messageGroupIdStringMessage group id to override default id.
sqsOptionsObject{}Additional options to extend/override the underlying SQS sendMessage request.


const myObj = { a: 1 }
await wtsqs.enqueueOne(myObj)

wtsqs.enqueueMany(payloads, options, sqsOptions) ⇒ Promise

Enqueue batch of payloads in the queue.

Kind: instance method of WTSQS
See: SQS#sendMessageBatch

payloadsArray.<Object>Array of JSON serializable objects.
optionsObjectOptions object.
options.messageGroupIdStringMessage group id to override default id.
sqsOptionsObject{}Additional options to extend/override the underlying SQS sendMessageBatch request.


const myObjList = [{ a: 1 }, { b: 3 }]
await wtsqs.enqueueMany(myObjList)

wtsqs.peekOne(options, sqsOptions) ⇒ Promise.<(Message|null)>

Retrieve single message without deleting it.

Kind: instance method of WTSQS
Returns: Promise.<(Message|null)> - Message object or null if queue is empty.

optionsObjectOptions object.
options.pollWaitTimeIntegerDuration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
options.visibilityTimeoutIntegerDuration (in seconds) that the received messages are hidden from subsequent retrieve requests.
sqsOptionsObject{}Additional options to extend/override the underlying SQS receiveMessage request.


const myMessage = await wtsqs.peekOne()
// output:
  id: 'messageId',
  receiptHandle: 'messageReceiptHandle'
  md5: 'messageMD5',
  body: { a: 1 }

wtsqs.peekMany(maxNumberOfMessages, options, sqsOptions) ⇒ Promise.<Array.<Message>>

Retrieve batch of messages without deleting them.

Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>> - Array of retrieved messages.
See: SQS#receiveMessage

maxNumberOfMessagesNumber10Maximum number of messages to retrieve. Must be between 1 and 10.
optionsObjectOptions object.
options.pollWaitTimeIntegerDuration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
options.visibilityTimeoutIntegerDuration (in seconds) that the received messages are hidden from subsequent retrieve requests.
sqsOptionsObject{}Additional options to extend/override the underlying SQS receiveMessage request.


const myMessageList = await wtsqs.peekMany(2)
// output:
   id: 'messageId',
   receiptHandle: 'messageReceiptHandle'
   md5: 'messageMD5',
   body: { a: 1 }
   id: 'messageId',
   receiptHandle: 'messageReceiptHandle'
   md5: 'messageMD5',
   body: { b: 3 }

wtsqs.deleteOne(message) ⇒ Promise

Delete single message from queue.

Kind: instance method of WTSQS
See: SQS#deleteMessage

messageMessageMessage to be deleted


const myMessage = await wtsqs.peekOne()
await wtsqs.deleteOne(myMessage)

wtsqs.deleteMany(messages) ⇒ Promise

Delete batch of messages from queue.

Kind: instance method of WTSQS
See: SQS#deleteMessageBatch

messagesArray.<Message>Messages to be deleted


const myMessageList = await wtsqs.peekMany(2)
await wtsqs.deleteMany(myMessageList)

wtsqs.deleteAll() ⇒ Promise

Delete ALL messages in the queue.

NOTE: Can only be called once every 60 seconds.

Kind: instance method of WTSQS
See: SQS#purgeQueue

await wtsqs.deleteAll()

wtsqs.popOne(options, sqsOptions) ⇒ Promise.<(Message|null)>

Retrieve single message and immediately delete it.

Kind: instance method of WTSQS
Returns: Promise.<(Message|null)> - Message object or null if queue is empty.

optionsObjectOptions object.
options.pollWaitTimeIntegerDuration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
options.visibilityTimeoutIntegerDuration (in seconds) that the received messages are hidden from subsequent retrieve requests.
sqsOptionsObject{}Additional options to extend/override the underlying SQS receiveMessage request.


const myMessage = await wtsqs.popOne()
// The message no longer exists in queue
// output:
  id: 'messageId',
  receiptHandle: 'messageReceiptHandle'
  md5: 'messageMD5',
  body: { a: 1 }

wtsqs.popMany(maxNumberOfMessages, options, sqsOptions) ⇒ Promise.<Array.<Message>>

Retrieve batch of messages and immediately delete them.

Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>> - Array of retrieved messages.

maxNumberOfMessagesNumber10Maximum number of messages to retrieve. Must be between 1 and 10.
optionsObjectOptions object.
options.pollWaitTimeIntegerDuration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
options.visibilityTimeoutIntegerDuration (in seconds) that the received messages are hidden from subsequent retrieve requests.
sqsOptionsObject{}Additional options to extend/override the underlying SQS receiveMessage request.


const myMessageList = await wtsqs.popMany(2)
// Messages no longer exist in queue
// output:
   id: 'messageId',
   receiptHandle: 'messageReceiptHandle'
   md5: 'messageMD5',
   body: { a: 1 }
   id: 'messageId',
   receiptHandle: 'messageReceiptHandle'
   md5: 'messageMD5',
   body: { b: 3 }


WTSQS worker job manager.

WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing other jobs concurrently. It also takes care of deleting a job from the queue after successfully processing the message.

Kind: global class

new WTSQSWorker(options)

Constructs WTSQSWorker object.

optionsObjectOptions object.
options.wtsqsWTSQSWTSQS instance to use for connecting to sqs.
options.maxConcurrencyInteger20Maximum number of concurrent jobs.
options.pollWaitTimeInteger5Duration (in seconds) for which read calls wait for a job to arrive in the queue before returning.
options.visibilityTimeoutInteger30Duration (in seconds) that the received jobs are hidden from subsequent retrieve requests.
options.loggerObject | StringObject with debug, info, warn, error methods to use for logging. Or a string with log level to use default internal logger.


const { WTSQS, WTSQSWorker } = require('wtsqs')

const wtsqs = new WTSQS({
  url: '//queue-url',
  accessKeyId: 'AWS_ACCESS_KEY_ID',
  secretAccessKey: 'AWS_SECRET_ACCESS_KEY'

const worker = new WTSQSWorker({ wtsqs })

worker.run(async (job) => {
 await someAsyncFunction(job.body)


Start fetching and processing jobs.

Kind: instance method of WTSQSWorker

handlerrunHandlerAsync function to process a single job.

worker.shutdown() ⇒ Promise

Shutsdown the worker and drain active jobs.

Kind: instance method of WTSQSWorker
Returns: Promise - Resolves when all active jobs have been drained.

WTSQSWorker~runHandler ⇒ Promise

Async callback function to process single job.

Kind: inner typedef of WTSQSWorker

jobJobA single job to process

Message : Object

Received SQS Message

Kind: global typedef

idStringMessage id.
receiptHandleStringMessage receipt handle.
md5StringMessage body md5 hash sum.
bodyObjectMessage body containing original payload.

Job : Object

Worker Job

Kind: global typedef

idStringJob id.
receiptHandleStringJob receipt handle.
md5StringJob body md5 hash sum.
bodyObjectJob body containing original payload.


5 years ago


5 years ago


5 years ago


6 years ago


6 years ago


6 years ago