1.1.7 • Published 1 year ago

@ambassify/queue v1.1.7

Weekly downloads
16
License
MIT
Repository
-
Last release
1 year ago

Queue

CircleCI

This library acts as a wrapper around different queue implementations that we might end up using.

Currently implemented backends: SQS

API

The public API that each queue exposes is defined as in Queue. An implementation for a new backend can be created using a new class that extends the public API. The public API is:

  • Queue.create( QueueType : class, ...args ) args is passed to the constructor of the QueueType.
  • constructor( queueName : string, options : object )
  • Available options are: itemPoolSize : int
  • getName() : string
  • getItemPool() : ItemPool ( see item-pool.js )
  • receive( count : int ) : Promise Attempt to receive at most count items
  • release( item : object, handled : boolean ) : Promise Release the item, if not handled the item will not be deleted from the queue. handled defaults to false.
  • touch( item : object, options : object ) : Promise touch / ping a message to keep it in use.
  • send( body : object ) : Promise submit a new queue item
  • connect() : Promise
  • start() : Promise Start watching the queue for new items
  • stop() : void Stop watching the queue for new items, a final batch might still arrive after calling stop()
  • lock( item : object, options: object ) : Promise Prevents a message from re-entering the queue.
  • unlock( item : object ) : Promise Release an earlier acquired lock.
  • on( event : string, callback : function ) : void Attach an eventhandler to the queue.
  • message event is triggered for each queue item that arrives.
  • error event is triggered for errors in the _eventLoop or _lock.

The public API will then call into the implementation specific methods through an internal API that each implementation should implement. The required private methods are:

  • _fetch( itemsToFetch : int ) : Promise Request itemsToFetch items from the queue. Do not perform any mutations on the raw object before resolving them.
  • _transform( item : object ) : object This method will receive the items retrieved using _fetch one by one, you can return altered objects from this method to change the queue items.
  • _delete( item : object ) : Promise Remove the item from the queue / mark as finished. This method should always receive the instance from the _transform step, such that you could add hidden fields to identify the item.
  • _touch( item : object, options: object ) : Promise Touch the message to keep it from becoming visible again.
  • _send( item : object ) : Promise Add item to the queue.
  • _connect() : Promise Start to connect with the backend.
  • _lock() : Promise Prevents a message from re-entering the queue. Default implementation uses queue.touch.
  • _unlock() : Promise Releases the lock and allows the item to re-enter the queue.

Libraries

  • BatchOperation Utility to batch batchSize items unless timeout expires. The SQS implementation uses this to batch delete and send operations.
  • ItemPool Currently only a counter which ensure no more than the poolsize amount of items are in flight.
  • sleep Returns a promise that resolves after a timeout.

Runtime configuration options

Configuration can be done through environment variables, options are:

  • BATCH_SIZE defaults to 10
  • QUEUE_POOL_SIZE defaults to 20
  • SQS_AWS_REGION defaults to AWS_REGION environment variable.
  • SQS_FETCH_WAIT defaults to 20 seconds
1.1.7

1 year ago

1.1.6

2 years ago

1.1.5

2 years ago

1.1.4

5 years ago

1.1.3

5 years ago

1.1.2

5 years ago

1.1.1

5 years ago

1.1.0

5 years ago

1.0.6

5 years ago

1.0.5

5 years ago

1.0.4

6 years ago

1.0.3

6 years ago

1.0.2

6 years ago

1.0.1

6 years ago

1.0.0

6 years ago

0.2.2

6 years ago

0.2.1

6 years ago

0.2.0

6 years ago

0.1.10

6 years ago

0.1.9

6 years ago

0.1.8

6 years ago

0.1.7

7 years ago

0.1.6

7 years ago

0.1.5

7 years ago

0.1.4

7 years ago

0.1.3

7 years ago

0.1.2

7 years ago

0.1.1

7 years ago

0.1.0

7 years ago