@ambassify/queue v2.0.0
Queue
This library acts as a wrapper around different queue implementations that we might end up using.
Currently implemented backends: SQS
API
The public API that each queue exposes is defined as in Queue. An implementation for a new backend can be created using a new class that extends the public API. The public API is:
Queue.create( QueueType : class, ...args )args is passed to the constructor of theQueueType.constructor( queueName : string, options : object )- Available options are:
itemPoolSize : int getName() : stringgetItemPool() : ItemPool( see item-pool.js )receive( count : int ) : PromiseAttempt to receive at mostcountitemsrelease( item : object, handled : boolean ) : PromiseRelease the item, if nothandledthe item will not be deleted from the queue.handleddefaults tofalse.touch( item : object, options : object ) : Promisetouch / ping a message to keep it in use.send( body : object ) : Promisesubmit a new queue itemconnect() : Promisestart() : PromiseStart watching the queue for new itemsstop() : voidStop watching the queue for new items, a final batch might still arrive after callingstop()lock( item : object, options: object ) : PromisePrevents a message from re-entering the queue.unlock( item : object ) : PromiseRelease an earlier acquired lock.on( event : string, callback : function ) : voidAttach an eventhandler to the queue.messageevent is triggered for each queue item that arrives.errorevent is triggered for errors in the_eventLoopor_lock.
The public API will then call into the implementation specific methods through an internal API that each implementation should implement. The required private methods are:
_fetch( itemsToFetch : int ) : PromiseRequestitemsToFetchitems from the queue. Do not perform any mutations on the raw object before resolving them._transform( item : object ) : objectThis method will receive the items retrieved using_fetchone by one, you can return altered objects from this method to change the queue items._delete( item : object ) : PromiseRemove theitemfrom the queue / mark as finished. This method should always receive the instance from the_transformstep, such that you could add hidden fields to identify the item._touch( item : object, options: object ) : PromiseTouch the message to keep it from becoming visible again._send( item : object ) : PromiseAdditemto the queue._connect() : PromiseStart to connect with the backend._lock() : PromisePrevents a message from re-entering the queue. Default implementation usesqueue.touch._unlock() : PromiseReleases the lock and allows the item to re-enter the queue.
Libraries
- BatchOperation Utility to batch
batchSizeitems unlesstimeoutexpires. The SQS implementation uses this to batchdeleteandsendoperations. - ItemPool Currently only a counter which ensure no more than the poolsize amount of items are in flight.
- sleep Returns a promise that resolves after a timeout.
Runtime configuration options
Configuration can be done through environment variables, options are:
BATCH_SIZEdefaults to10QUEUE_POOL_SIZEdefaults to20SQS_AWS_REGIONdefaults toAWS_REGIONenvironment variable.SQS_FETCH_WAITdefaults to20seconds
6 months ago
1 year ago
3 years ago
3 years ago
3 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago