0.0.1 • Published 6 months ago

parallel-async-queue v0.0.1

Weekly downloads
-
License
ISC
Repository
-
Last release
6 months ago

Async Queue

A package to handle high number of asynchronous functions when time is a problem and to fail is not an option.

Sometimes we have specific time sensitive problems that are hard to handle asynchronously, as code refuses to wait. Imagine this situation: you need to get a lot of information from an API, but this API have a rate limiter. This API does not allows you to take all the information in one single request, so you need to send hundreds of requests to get everything that you need. But it's limiter is not allowing you to do more than a handful of requests before returning 429 (HTTP "Too Many Requests" error code).

This rate limiter is time sensitive. 20 requests every 10 seconds. How to handle that? Waiting 10 seconds using a settimeout wouldn't be a good practice. And what about the failed requests?

What about the third party library is simply not reliable? How would you retry failing / inconsistent requests?

That's what Async Queue is here to solve!

Setup

npm i parallel-async-queue

How to use it

Instantiate a new Queue

const asyncQueue = new Queue()

Add asynchronous functions to it.

    asyncQueue.add(() => Promise.resolve("Hello World"));

Run the queue, setting how many asynchronous functions should run in parallel and wait for the result.

Using async/await:

    const FUNCTIONS_IN_PARALLEL = 100;
    const result = await asyncQueue.run(FUNCTIONS_IN_PARALLEL);

Using promises:

    const FUNCTIONS_IN_PARALLEL = 100;
    asyncQueue.run(FUNCTIONS_IN_PARALLEL)
        .then((result) => {
            //...
        });

Using events

Instead of waiting for the entire batch of results, you can listen for the result of every function you add.

asyncQueue.add(
    () => Promise.resolve("Hello World"),
    'description',
    'uniqueIdentifier'
    (error, data) => {
        // Handle the return for this specific function
    }
);

Queue Options

Using the parallel-async-queue package allows you to leverage from different utilitary options. You can set those options when instantiating a new Queue:

    const asyncQueue = new Queue({
        // ...options
    });
OptiontypedescriptionDefault value
reAddAbortedItemsbooleanWhether or not manually aborted items should be readded to the queue.false
rejectedFirstbooleanWhether or not rejected or aborted items should be readded in the beggining of the queue.false
retriesnumberThe number of times a rejected item should be retried. Aborted items do not count as rejected for the number of retries.0
timeBetweenRetriesnumberThe time in milliseconds to wait between retries for failed items.0
endWhenSettledbooleanWhether or not the scheduler should wait for new items when currently set items are settled. When this option is set to false, the Queue will need to be stopped using the stop method.true

Queue Methods

The parallel-async-queue package allows you to control the flow of your asynchronous functions.

MethodParametersReturnsDescription
addasyncAction (required) description - description saved in the item object for user reference identifier - identifier to replace automatically set ids onReturn - callback function called when asyncAction is settled shouldRetry - function that receives an error as parameter and should return whether or not a rejected asyncAction should be retriedThe queued item in this format: { id: string, action: Function, retries: number, description: string, error: Error, data: any }Adds an asynchronous function to the queue. New items are added to the end of the queue.
removeItem identifier (id)The count of removed items from the queueRemoves all items with provided item Id from the queue.
clearN/AThe count of removed items from the queueRemoves all items from the queue. Running items are still going to settle.
pauseabort - Whether or not running items should be aborted. The result of aborted asyncActions are ignored.voidPauses the execution of new asyncActions. If aborted, also ignores the result of running items (i.e. they will not settle). If reAddAbortedItems option is set to true, aborted items are readded to be executed when the queue is resumed.
stopN/AThe count of removed items from the queuePauses and aborts the queue execution, clears the queue and kills the keep alive loop if endWhenSettled is set to false.
destroyN/AvoidStops, but make sure that the keep alive loop is destroyed.
resumeresumeCount - The number of parallel asyncActions that should be resumed. The maximum number of parallel asyncActions are still set by the maxParallelProcessors parameter in the run method.voidUnpauses / resumes the queue execution.
runmaxParallelProcessors - The number of maximum parallel asyncActions to run at a time.A promise of settled, resolved and rejected async actions. Each object is organized by the queued item id.Starts the queue execution and returns a promise that resolves when all asyncActions are settled and the keep alive loop is destroyed.

Aborted vs rejected items

Aborted items are asynchronous functions that had their execution stopped by the user. An aborted item never settles. The parallel-async-queue package will send an abort signal, which can be used by the provided callback function. If the callback function does not aborts its execution, asynchronous functions will continue to run after aborted, but it's result will be ignored. Aborted functions that are not readded to the queue will not appear in the settledItems object in the result.

Rejected items are asynchronous functions (promises) that were rejected within it's execution. Rejected functions will save their last error message (if different within retries). Rejected items that should retry (using the retries option from Queue's constructor and the shouldRetry callback function from the add method), will keep trying after a delimited number of milliseconds (using the timeBetweenRetries option from Queue's constructor). If a rejected item shouldn't retry or all retries failed, the item will be saved in the settled and the rejected objects.

Future development
  • Add support to streams
  • Add better error handling when aborting items

Any PR is welcomed!

0.0.1

6 months ago