0.2.3 • Published 4 years ago

piperunner v0.2.3

Weekly downloads
1
License
MIT
Repository
-
Last release
4 years ago

Piperunner

Manage complex application logic using in memory pipelines.

Composed by three objects:

  1. Pipeline: define a series of functions in order accomplish a job
  2. Runner: run Pipeline against a series of data
  3. Scheduler: run Runners with start/execution/sharing policies

## Install

npm install --save piperunner

Usage

You can use any of the objects independently from the others. See the examples folder for complex examples.

Complete example

let PipeRunner = require('piperunner')
let scheduler = new PipeRunner.Scheduler()

/**
*	Define a scheduler pipeline named
*	'processing_video', and a pipeline
*	*step*.
*/
scheduler.pipeline('processing_video')
.step('first step', (pipeline, job) => {
	console.log('acquiring video ->', job)
	pipeline.next()
})

/**
*	Add another step to the pipeline
*/
scheduler.pipeline('processing_video')
.step('second step', (pipeline, job) => {
	console.log('> publish video ->', job, pipe.env.videoFrameRate)
	pipeline.next()
})

/**
*	Configure the scheduler in order
*	to run the pipeline at the *start.processing.video*
*	event and every 1000 milliseconds
*/
scheduler.run({
	name: 'processing_video',
	run: {
		onEvent: 'start.processing.video',
		everyMs: 1000
	}
})

/**
*	Assign data to the pipelines runner (internal
*	to the scheduler)
*/
scheduler.feed({
	name: 'processing_video', 
	data: [{name: 'video1'}, {name: 'video2'}, {name: 'video3'}] 
})

scheduler.log(true)
scheduler.emit('start.processing.video')

/** Output
*
* Emitting start.processing.video
* Running pipeline processing_video with 1 running process
* acquiring video -> { name: 'video1' }
* > publish video -> { name: 'video1' }
* acquiring video -> { name: 'video2' }
* > publish video -> { name: 'video2' }
* acquiring video -> { name: 'video3' }
* > publish video -> { name: 'video3' }
* 
*/

Scheduler example options

scheduler.run({
	name: 'processing_video',
	run: {
		// will run when this event is emitted
		onEvent: 'start.processing.video',
		// will run when these events are emitted 
		onEvents: ['event1', 'event2'],
		// run every milliseconds
		everyMs: 1000
	},
	on: {
		end: {
			// emit this series of event when the pipeline ends
			emit: ['end.event'],
			// exec these functions then the pipeline ends
			exec: [
				// Pass to another pipeline the data
				(scheduler, pipeline) => { 
					scheduler.assignData('processing_audio', 'audio_frames', pipeline.data().processedAudio)
				}
			]
		}
	}
})

How To

Stop a pipeline

scheduler.stop({name: 'processing_video'})

Ovverride the internal EventEmitter

scheduler.emitter(new EventEmitter)

End a job before reaching the pipe end

let pipe = scheduler.pipeline('processing_video')

pipe.step('maybe', (pipe, job) => {
	if (job.name == 'api1-step1') {
		pipe.end() // Ending 
	} else {
		pipe.next()
	}
})

End the entire runner before time no processing the other jobs

let pipe = scheduler.pipeline('processing_video')

pipe.step('maybe', (pipe, job) => {
	if (job.name == 'api1-step1') {
		pipe.endRunner() // Exit the runner 
	} else {
		pipe.next()
	}
})

Pass data between pipe functions

let pipe = scheduler.pipeline('processing_video')

pipe.step('compute data', (pipe, job) => {
	pipe.next('Pass data')
})

pipe.step('need data', (pipe, job, incomingData) => {
	pipe.next('Received data ->', incomingData)
})

Store internal data

let pipe = scheduler.pipeline('processing_video')

pipe.step('compute data', (pipe, job) => {
	pipe.data['customdata'] = 'Yeah'
	pipe.next()
})

pipe.step('need data', (pipe, job,) => {
	pipe.next('Received data ->', pipe.data['customdata'])
})

Set the end callback

let runner = new Runner(jobs, pipe, () => {
	console.log('All Jobs finished')
})
0.2.3

4 years ago

0.2.2

4 years ago

0.2.1

4 years ago

0.2.0

4 years ago

0.1.9

4 years ago

0.1.8

4 years ago

0.1.7

4 years ago

0.1.6

4 years ago

0.1.4

4 years ago

0.1.5

4 years ago

0.1.3

4 years ago

0.1.2

4 years ago

0.1.1

4 years ago

0.1.0

4 years ago