1.4.0 • Published 8 years ago

tasks-runner v1.4.0

Weekly downloads
2
License
MIT
Repository
github
Last release
8 years ago

Tasks Runner Build Status

Could resolve following issues:

  1. Schedule task to be executed at specified time
  2. Schedule task to be executed with specified period
  3. Schedule tasks within specified group to be executed one by one in order like it was scheduled
  4. Graceful shutdown. All started runners will have 30 seconds to finish its current tasks in case of shutdown.

Installation

npm install --save tasks-runner

How it works

Based on periodical scanning iterations. Every iteration it executes a tasksPerScanning count of tasks. New iteration will not be scheduled until old one finished. When worker pick task - task will be marked with current date and locked. In another words - this task will be excluded from queue, so nobody will touch it. It could be picked for processing again with following requirements: task is not still processed and previous task lock was more than lockInterval seconds before. So you need to be sure that every of scheduled tasks will be finished in lockInterval seconds.

Task will be marked as failed if it thrown any error and as processed in others cases.

Graceful shutdown

Runner supports graceful shutdown. All started runners will have 30 seconds to finish current tasks in case of shutdown.

Task processor

Task processor could be a function or an object with implemented .run() method. Please create it as function-generator or regular function which returns Promise. Every task processor receives three arguments:

  1. Data that it needs to process.
  2. Result of previous task in case if both tasks are members of same group. In other cases it passes null.
  3. Extended information with recent error (in case of failed previous processing) and creation date of task.

Please see code sample below for details (it is how task processor will be executed by task runner):

extendedInfo = {
    failedAt: task.failedAt,  // date of previous error
    errorMsg: task.errorMsg,  // message of previous error
    retries: task.retries,    // count of failed executions
    createdAt: task.createdAt // creation date of task
};
taskResult = yield taskProcessor(task.data, previousTaskResult, extendedInfo);

How to use group of tasks

It is possible to assign a couple of tasks into same group. In this case these tasks will be executed in order like it was scheduled. If some task in group will be failed by some reason - others tasks will be postponed. The result of previous task will be passed to current one as an second argument. Also every task processor will receive extended information about task as an third argument.

Task model

By default new task will be stored in "tasks" collection (it is configurable).

FieldTypeDescription
_idObjectIdMongo id
taskIdstringSpecified unique id or auto generated uuid.
namestringSpecified name that will be passed to taskProcessorFactory. See .run() for details.
data*Specified data that will be passed to task processor.
groupstringSpecified group that will be used to execute tasks one by one in queue. If some task will be failed - others tasks with the same group will be postponed.
startAtDateSpecified date when task should be executed.
repeatEverynumberSpecified period in seconds which should be used as basis for repeatable task.
retryStrategystringSpecified strategy to apply for failed task. See .schedule() for details.
lockedAtDateDate when some worker picked task to process.
processedAtDateDate when task was finished successfully. For repeatable tasks it will be always null.
failedAtDateDate of recent error.
errorMsgstringMessage of recent error.
retriesnumberCounter of how many times this task was executed. It will be increased per every failed processing.
createdAtDateDate when task was created.

API

.connect(url, options)

Returns Promise. Promise will be resolved as soon as connection will be created. We suggest you to not wait its resolving because it will do it as soon as any query will need it.

ParameterTypeRequiredDescription
urlstringrequiredConnection url for mongodb
optionsObjectoptional
options.collectionstringoptionalCollection name that should be used by default
options.loggerObjectoptionalLogger that should be used. It should implement .debug() and .error() methods

See examples

.schedule(name, data, options)

Returns Promise. Passes scheduled task to resolver. Schedules task but does not execute it. For execution you will need worker

ParameterTypeRequiredDescription
namestringrequiredTask name, will be passed as an argument into taskProcessorFactory
datamixedrequiredTask data, will be passed as an rgument into taskProcessor
optionsObjectoptional
options.taskIdstringoptionalUnique identifier, by default uuid will be generated
options.startAtDateoptionalDefining when task should be executed. By default - current date.
options.repeatEverynumberoptionalPeriod of repeatable task in seconds. By default - 0 (disabled)
options.groupstringoptionalGroup of task. By default - null (disabled)
options.retryStrategystringoptionalIn what time task should be rescheduled in case of any error. By default it uses "pow1". Value should be matched with following patterns (N - any integer):none - don't reschedule it, task will be run in every scanning iterationpowN - retries count with pow NNm - in N minutesNh - in N hoursNd - in N days
options.collectionstringoptionalCollection where task should be stored. By default - "tasks".

See examples

.run(options)

Returns Promise which will be resolved after first scanning iteration. Executes scheduled tasks.

ParameterTypeRequiredDescription
optionsObjectrequired
options.scanIntervalnumberoptionalPeriod of scanning for ready to execute tasks, in seconds. By default - 60 seconds.
options.lockIntervalnumberoptionalMax time for task to be finished or failed, in seconds. By default - 60 seconds.
options.groupIntervalnumberoptionalTime that should be used for rescheduling tasks in scope of one group in case if previous task is not still processed, in seconds. By default - 5 seconds.
options.taskProcessorFactoryfunctionoptionalShould return task processor for provided task name (via first argument). Task processor could be a function or an object with .run() method. As an argument task processor receives task data. Also it can receive result of previous task execution in case if both tasks are members of the same group.
options.tasksPerScanningnumberoptionalCount of tasks that should be executed per scanning iteration. By default - 1000. Iteration should be finished in scanInterval seconds.
options.collectionstringoptionalCollection that should be managed by this worker. By default - "tasks".

See examples

.stop()

Returns Promise. Graceful shutdown. All started runners will have 30 seconds to finish its current tasks in case of executing this function.

See examples

.findTask(query, collection)

Returns Promise or null. Finds one task by some mongo query.

ParameterTypeRequiredDescription
queryObjectrequiredMongo query for find operation.
collectionstringoptionalCollection that should be used. By default - "tasks".

.remove(query, collection)

Returns Promise. Removes tasks by some mongo query.

ParameterTypeRequiredDescription
queryObjectrequiredMongo query for remove operation.
collectionstringoptionalCollection that should be used. By default - "tasks".

.close()

Returns Promise or undefined (in case if connection is not exists). Force closing connection to mongo. Usually you don't need to do it manually, but probably you will need it for some tests.

1.4.0

8 years ago

1.3.1

8 years ago

1.3.0

8 years ago

1.2.4

8 years ago

1.2.3

8 years ago

1.2.2

8 years ago

1.2.1

8 years ago

1.2.0

8 years ago

1.1.1

8 years ago

1.1.0

8 years ago

1.0.5

8 years ago

1.0.4

8 years ago

1.0.2

8 years ago

1.0.1

8 years ago