1.0.2 • Published 4 years ago

@jesuarva/js-pipeline v1.0.2

Weekly downloads
3
License
MIT
Repository
github
Last release
4 years ago

A Pipeline implementation in JavaScript with an API that mimics the Array iteration methods.

It is an approach to solve performance issues processing big blocks of data in a non-blocking way. The Pipeline is implemented with JS generators

There are currently two proposals to introduce a new pipeline operator in JavaScript: tc39/proposal-pipeline-operator

Install

npm install @jesuarva/js-pipeline

How to initialize a Pipeline

const Pipeline = require('path/to/Pipeline');

const iterable = new Array(10000); // Any `iterable`  object

for (let i = iterable.length - 1; i >= 0; i--) {
  iterable[i] = i;
}

const pipeline = Pipeline(iterable);

Usage example

After running example1.js:

node src/examples/example1.js

// src/examples/example1.js
const Pipeline = require('../Pipeline');

const arr = new Array(100);

for (let i = arr.length; i >= 0; i--) {
  arr[i] = i;
}

Pipeline(arr)
  .filter(item => item % 2 === 0 && item % 10 === 0)
  .map(item => `Pipeline-1 ${item}`)
  .effect(value => console.log(value))
  .runAsync(response => console.log('Pipeline-1 last callbackFn', response));

Pipeline(arr)
  .filter(item => item % 3 === 0 && item % 30 === 0)
  .map(item => `Pipeline-2 ${item}`)
  .effect(value => console.log(value))
  .runAsync(response => console.log('Pipeline-2 last callbackFn', response));

Pipeline(arr)
  .filter(item => item % 3 === 0 && item % 30 === 0)
  .map((item, index) => `Pipeline-3 ${item}. index-${index}`)
  .effect(value => console.log(value))
  .runAsync(response => console.log('Pipeline-3 last callbackFn', response));

the output in the console looks like:

✘-127 ~/Sites/js-pipeline [master ↑·1|✚ 3…2]
13:43 $ node src/examples/example1.js ;2A
Pipeline-1 0
Pipeline-2 0
Pipeline-3 0. index-0
Pipeline-1 10
Pipeline-2 30
Pipeline-1 20
Pipeline-3 30. index-1
Pipeline-2 60
Pipeline-1 30
Pipeline-3 60. index-2
Pipeline-2 90
Pipeline-1 40
Pipeline-2 last callbackFn { done: true,
  value:
   [ 'Pipeline-2 0',
     'Pipeline-2 30',
     'Pipeline-2 60',
     'Pipeline-2 90' ] }
Pipeline-3 90. index-3
Pipeline-1 50
Pipeline-1 60
Pipeline-3 last callbackFn { done: true,
  value:
   [ 'Pipeline-3 0. index-0',
     'Pipeline-3 30. index-1',
     'Pipeline-3 60. index-2',
     'Pipeline-3 90. index-3' ] }
Pipeline-1 70
Pipeline-1 80
Pipeline-1 90
Pipeline-1 100
Pipeline-1 last callbackFn { done: true,
  value:
   [ 'Pipeline-1 0',
     'Pipeline-1 10',
     'Pipeline-1 20',
     'Pipeline-1 30',
     'Pipeline-1 40',
     'Pipeline-1 50',
     'Pipeline-1 60',
     'Pipeline-1 70',
     'Pipeline-1 80',
     'Pipeline-1 90',
     'Pipeline-1 100' ] }

The output shows how the pipeline runs asynchronously, processing a unit of data one by one, one after the other, no blocking the main thread.

API Reference

The public API emulates the Array's iteration methods (map, filter).

pipeline.map(mapFn)

Emulates Array.map. mapFn: Function that is invoke with each element of the iterable. Each time callback executes, the returned value is passed thought the pipeline.

interface pipelineAPI {
  map(mapFn: <T, S>(currentData: T, index: number) => S | T, index: number): void;
}

pipeline.filter(filterFn)

Emulates Array.filter. filterFn is a predicate, to test each element of the iterable. Return true to keep the element, false otherwise.

interface pipelineAPI {
  filter(filterFn: <T>(currentData: T, index: number) => boolean): void;
}

pipeline.effect(effectFn)

Meant to perform a side effect with the current chunk of data outside the pipeline data-flow.

interface pipelineAPI {
  effect(effectFn: <T>(currentValue: T, index: number) => any): void;
}

pipeline.getPipeLine()

Returns underlying generator instance.

Could be useful if looking to run the generators-pipeline manually.

interface iteratorResult {
  value: any;
  done: boolean;
}

interface generatorObject {
  next(value?: any): iteratorResult;
  throw(value?: any): iteratorResult;
  return(value?: any): iteratorResult;
}

interface pipelineAPI {
  getIterator(): generatorObject;
}

pipeline.stopPipeLine()

Stops the pipeline.

interface pipelineAPI {
  stopPipeLine(): void;
}

pipeline.runAsync(callback)

Run asynchronously the pipeline. callback is called when the pipeline has finished processing all data chunks. Returns an Array of data processed.

type pipeLineResponse = {
  done: true;
  value: any[]; // Array of processed data.
};
interface pipelineAPI {
  runAsync(callback: (response: pipeLineResponse) => any): void;
}

pipeline.runAsSaga(options)

A version of runAsync adapted to run within a redux-saga task.

interface sagaFnOptions {
  callbackEffect: <T>(response: pipeLineResponse, ...others: T[] | []) => any;
  callbackArguments: any[];
}
interface pipelineAPI {
  runAsSaga(options: sagaFnOptions): void;
}

Types & Interfaces

interface iter {
  length: number;
}

interface generatorObject {
  next(value?: any): iteratorResult;
  throw(value?: any): iteratorResult;
  return(value?: any): iteratorResult;
}

interface iteratorResult {
  value: any;
  done: boolean;
}

type pipeLineResponse = {
  done: true;
  value: any[];
};

interface sagaFnOptions {
  callbackEffect: <T>(response: pipeLineResponse[], ...others: T[] | []) => any;
  callbackArguments: any[];
}

interface pipelineAPI {
  runAsync(callback: (response: pipeLineResponse) => any): void;
  runAsSaga(options: sagaFnOptions): void;
  map(mapFunction: <T, S>(currentValue: T, index: number) => S | T, index: number): void;
  filter(filterFunction: <T>(currentValue: T, index: number) => boolean): void;
  effect(effectFn: <T>(currentValue: T, index: number) => any): void;
  getPipeLine(): generatorObject;
  stopPipeLine(): void;
}