1.1.1 • Published 8 years ago

metatasks v1.1.1

Weekly downloads
2
License
MIT
Repository
github
Last release
8 years ago

Metatasks

Metatasks is a FIFO queue for Promise-based tasks. Optionally persistent to ensure potentially long, fault-prone running tasks survive reboots. Supports concurrency-limiting, metadata and state-tracking for statistics or debugging. Makes use of ES2015 decorators and optionally the new ES6 Proxy object.

Installation

$ npm install metatasks --save

This library supports ES7 decorators proposal which is supported by babel and typescript. To use it with babel you should enable experimental es7.decorators feature in babel as described here. To use it with typescript you should enable experimentalDecorators and emitDecoratorMetadata in tsconfig.json

Usage

The simplest way to use metatasks queue system is to decorate an asynchronous method (one which returns a Promise).

import {useQueue} from 'metatasks';

class Example {
  @useQueue(3)
  async exampleMethod(retVal) {
    return retVal;
  }
}

Now, however many times you invoke the exampleMethod, only 3 instances will be running at the same time. You may still await the result, as the Promise will pass it eventually, when it's the task's turn to start and complete.

This is great if you want to queue all the calls to the method using the same queue. If you'd like to make queueing optional, you can use the experimental method proxying described below or generate a queue manually.

Persistence

Persistence is achieved through an interface class of your design, thus any database can be used for task storage.

class Example {
  @useQueue({
      concurrency: 3,
      queueId: 'storedExample',
      injectTask: true,
      taskNameResolver: function(someParam) { return `${this.somethingFromThisObject} : ${someParam}` },
      persistence: {
        interface: PersistentTaskDatabase,
        thisRestorer: async function (objectStoredInTheDatabase) { return new Example(objectStoredInTheDatabase) },
        thisStorer: async function (target) { return { ... }}
      }
  })
  async exampleMethod(someParam, task) {
    return someParam;
  }
}

Persistence interface

The interface must be a class that implements static methods as in the following example:

export default class PersistentTaskDatabaseExample {
  static async taskEnqueue(queueId, taskJson:Object) {
  }

  static async taskSetStarted(queueId, taskId, metadata) {
  }

  static async taskSetCompleted(queueId, taskId, value, metadata) {
  }

  static async taskSetFailed(queueId, taskId, error, metadata) {
  }

  static async taskSetMetadata(queueId, taskId, key, value) {
  }
  
  static async taskMergeMetadata(queueId, taskId, obj) {
  }

  static async setAllStartedAsInterrupted(queueId) {
  }

  static async getTasksToQueue(queueId, includeInterrupted = true):Array<Object> {
    return [];
  }
}

Available configuration options for useQueue

  • concurrency: how many instances of this method are allowed to run simultaneously
  • queueId: the unique identifier for this queue, can be used to access the queue from the script and by the database to identify the stored tasks
    defaults to: automatically generated name from filename, class and method name
  • injectTask: if true, will make it possible to access the task from the method as the last parameter (note: when this is enabled, be careful to always pass the right amount of parameters to the method, even when they are 'undefined')
    defaults to: false
  • taskNameResolver: a method returning a String with the name of the task, which makes it possible to identify the instance of the task; invoked when the task is enqueued with the same parameters as the actual method and on the same target object (can access this)
    defaults to: name automatically generated from the class name, method name and a sequential number (or random when using persistence)
  • metadataResolver: a method returning an Object like { name: 'taskName', ... }, similar to above, if you want to store more information about the given task to the database
    when used: overrides taskNameResolver and requires 'name' property to be present
  • persistence: an object containing:
    • interface: prototype of a class, implementing static async methods that store or retrieve information about tasks in the database
    • thisStorer: a method that converts the target object - execution context of the task (this) - into an object that can be stored in the database
    • thisRestorer: a method that recreates the context (this), reversing the above
  • queueModifier: a function that is invoked on the immediately after the queue is created, before any tasks are added
    mostly useful for adding additional callbacks for queue events

Example queueModifier

function(queue) {
  // ...
  queue.on('started', onStarted);
  queue.on('finished', onFinished);
  queue.on('drained', onDrained);
  queue.on('added', onAdded);
}

Method proxying

Method proxying will only work under V8 if you run your application with the --harmony_proxies flag (as of October 2015, subject to change).

import {enableQueuedCalls} from 'metatasks';

@enableQueuedCalls
class Example {
  async exampleMethod(retVal) {
    ...
    return retVal;
  }
}

let example = new Example();
example.useQueue({concurrency: 1, queueId: 'my-queue'}).exampleMethod(123);

Adding metadata during task execution

This is useful for creating super-tasks, long-running tasks that embed other tasks and continue execution from a certain point, instead of starting over from beginning in case of a system failure.

class Example {
  @useQueue({
      concurrency: 3,
      injectTask: true,
      persistence: { ... }
  })
  async exampleMethod(someParam, task) {
    if (task.metadata.status != 'part-1-completed') {
      // doing something...
      // finished part 1 of task
      await task.setMetadata('status', 'part-1-completed');
    }
    // no need to redo what was already done...
    // ...
    return someParam;
  }
}

Accessing queues in memory and the states of their tasks

import {Queues} from 'metatasks';

let queueId = 'storedExample';
let queue = Queues.get(queueId);

Available properties and actions on a TaskQueue:

  • queue.whenDrained - a Promise that resolves once all tasks have finished running and their states were synced with the persistent database
  • queue.empty - Boolean, true when empty
  • queue.queue - Array<TaskInstance> waiting to be started
  • queue.runningTasks - Map<processID, TaskInstance>
  • queue.pause() - pause the queue (won't interrupt tasks which are already running)
  • queue.start() - resume the queue
  • queue.scheduleTask(metadata:Object|string, target:Object, ...params:Array)
  • queue.asyncEventPromises - can be used to ensure all the up-to-date information is stored in the database, e.g. used when doing a graceful shutdown queue.pause(); await Promise.all(queue.asyncEventPromises)
  • queue.gracefulShutdown() - halts the system while making sure all tasks have their states synced
  • queue.happyShutdown() - halts the system while making sure all running tasks complete and have their states synced
  • queue can be iterated with for-of to go through all currently running and queued tasks
  • queue is an EventEmitter and emits the following events:
    • added => function(task)
    • started => function(task)
    • finished => function(task)
    • drained => function(tasks[])

Available properties and actions on a TaskInstance:

  • task.metadata - the object that contains tasks metadata
  • task.setMetadata(key, value) - returns Promise
  • task.mergeMetadata(obj) - returns Promise
  • task.promise - promise that is fulfilled once the task completes
  • task.generatedPromise - the actual promise of the method, once it is started
  • task.setCancelMethod(method) - to be used from inside of the task - pass a method that will cancel (fail) the task prematurely
  • task.name - returns the task's name

Creating a queue without using decorators

See this test for an example of how to do that.

Alternatives

I have not tested these, however they look like they could be used to achieve similar results.