2.1.4 • Published 19 days ago

pipelane v2.1.4

Weekly downloads
-
License
GNU
Repository
github
Last release
19 days ago

PipeLane

A library to orchestrate a set of tasks where each task could have variants. Support for resume using checkpoints.

Installation

npm install pipelane

Basic Usage

// Implement your task by implementating interface `PipeTask` as a class.
// Register your task and its variants in variant config
const variantConfig = {
            [SimplePipeTask.TASK_TYPE_NAME]: [new SimplePipeTask('simplevar1'), new SimplePipeTask('simplevar2'), new SimplePipeTask('simplevar3')]
};

const pipeLane = new PipeLane(variantConfig).enableCheckpoints('test')
            .pipe({
                type: SimplePipeTask.TASK_TYPE_NAME,
                uniqueStepName: 'Step1'
            })
            .pipe({
                type: SimplePipeTask.TASK_TYPE_NAME,
                uniqueStepName: 'Step2',
                variantType: 'simplevar3'
            })
            .sleep(1000)
            .pipe({
                type: SimplePipeTask.TASK_TYPE_NAME,
                uniqueStepName: 'Step3'
            })
            .checkpoint()
            .parallelPipe({
                type: SimplePipeTask.TASK_TYPE_NAME,
                uniqueStepName: 'Step4',
                variantType: 'simplevar2'
            }).parallelPipe({
                type: SimplePipeTask.TASK_TYPE_NAME,
                uniqueStepName: 'Step5'
            }).shardedPipe({
                type: SimplePipeTask.TASK_TYPE_NAME,
                uniqueStepName: 'Step6',
                numberOfShards: 2
            }).pipe({
                type: SimplePipeTask.TASK_TYPE_NAME,
                uniqueStepName: 'Step7'
            })
            .clearCheckpoint()
            .pipe({
                type: SimplePipeTask.TASK_TYPE_NAME,
                uniqueStepName: 'Step8'
            }).start();

Functions

enableCheckpoints

Enable checkpoint support. Pass a name of the pipe as a parameter.

checkpoint

Create checkpoint with current state of pipe. When the start() is called again, the tasks resume from where left.

clearCheckpoint

Clear checkpoints if exists

pipe

Add a sequential task to pipeline.

  .pipe({
         type: SimplePipeTask.TASK_TYPE_NAME // Mandatory
         variantType: 'simplevar2' // optional, if absent the first task of the fiven type from the variants will be picked
       })

parallelPipe

Add a parallel task to pipeline.

shardedPipe

Similar to parallelPipe with key difference that, the input to this task is split into numberOfShards groups and each group is fed to a task. Each of these shards are executed parallely and the output is collected.

  .shardedPipe({
         type: SimplePipeTask.TASK_TYPE_NAME, // Mandatory
         numberOfShards: 2 // Mandatory
       })

Load Balancing

Load balancing will be particularly helpful if you want to split the tasks uniformly among the variants. If a variantType is not specified while piping then the first variant with load less than the cutoffLoadThreshold will be selected while if variantType is specified and if its overloaded then pipelane will stop with an error.

.pipe({
        type: SimplePipeTask.TASK_TYPE_NAME,
        uniqueStepName: 'Step8',
        variantType: 'simplevar1',
        cutoffLoadThreshold: 99
      })

Make sure to override the getLoad() in your Task class.

async onLoad(){
    let currentLoad = ...; // calculate your load
    return currentLoad;
}
2.1.4

19 days ago

2.1.3

20 days ago

2.1.2

7 months ago

2.1.1

7 months ago

2.0.5

9 months ago

2.0.4

9 months ago

2.0.7

9 months ago

2.0.6

9 months ago

2.0.9

9 months ago

2.0.8

9 months ago

2.1.0

9 months ago

2.0.3

1 year ago

2.0.2

1 year ago

2.0.1

1 year ago

1.0.8

1 year ago

1.0.7

1 year ago

1.0.6

1 year ago

1.0.5

1 year ago

1.0.4

1 year ago

1.0.3

1 year ago

1.0.2

1 year ago

1.0.1

1 year ago

1.0.0

1 year ago