1.0.2 • Published 3 years ago

callbag-distribute v1.0.2

Weekly downloads
5
License
MIT
Repository
github
Last release
3 years ago

callbag-distribute

Callbag operator that broadcasts a single source to multiple sinks. Does reference counting on sinks and starts the source when the first sink gets connected, similar to RxJS .share(). Works on either pullable or listenable sources. Distributes on data requests one at a time.

Usages

as depicted below, helpful for a Task Queue -> Task runner pattern. when used with fromIter and array, it can be used to push each element into it's own task runner.

makeWorker takes three arguments, a Promise (unit of work), a callback when the queue / worker is exhausted, and an optional Id for logging.

when ran as NODE_ENV=dev emits details of the queue processing.

Each worker completes it's activity then asks for the 'next' available to work. for the purposes of demonstration -- in the sample code below the task is an indeterminate SetTimeout.

This 'unit-of-work' could as well be driving a chrome browser or other computationally intensive tasks.

note - if workers exceed amount of work to be done, workers will initialize with duplicate workloads. recommend the pattern of setting the workers to length in that case

installation

npm install callbag-distribute

example

Share a pullable source to 5 pullers:

import { distribute, makeWorker } from 'callbag-distribute'
import { fromIter } from 'callbag-from-iter';


function randomIntFromInterval(min: number, max: number): number {
    // min and max included
    return Math.floor(Math.random() * (max - min + 1) + min);
}
const MAX_WORKERS = 5;

// 1, 2, 3, ... 150
const arrayOfWork = Array.from(Array(150).keys());

const source = distribute(
    fromIter(arrayOfWork),
);

const DYNAMIC_WORKERS = arrayOfWork.length < MAX_WORKERS ? arrayOfWork.length : MAX_WORKERS;

for (let step = 0; step < DYNAMIC_WORKERS; step++) {
    source(
        0,
        makeWorker(
            data => { // argument one - what to do with 'each' data value from source.  this is your business logic.
                const randomTime = randomIntFromInterval(2000, 5000);
                console.log(`start ${data} expecting to take ${randomTime}`);
                console.time(`stop  ${data}`);
                return new Promise(resolve => {
                    const id = setTimeout(() => {
                        console.timeEnd(`stop  ${data}`);
                        clearTimeout(id);
                        resolve();
                    }, randomTime);
                });
            },
            () => console.log(`worker ${step} complete`), // upon completion, invoke this callback. this could resolve a promise in conjunction of Promise.all. 
        ),
    );
}

results in:

{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'0,223164', event: 'handshake'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'0,2108004', event: 'handshake done...  asking for first '}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'0,2271832', event: 'starting', value:' 10'}
start 10 expecting to take 2442
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'0,3892', event: 'handshake'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'0,60011', event: 'handshake done...  asking for first '}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'0,111226', event: 'starting', value:' 20'}
start 20 expecting to take 3607
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'0,3151', event: 'handshake'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'0,62442', event: 'handshake done...  asking for first '}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'0,103227', event: 'starting', value:' 30'}
start 30 expecting to take 2883
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'0,6640', event: 'handshake'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'0,52010', event: 'handshake done...  asking for first '}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'0,88967', event: 'starting', value:' 40'}
start 40 expecting to take 4497
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'0,2073', event: 'handshake'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'0,44151', event: 'handshake done...  asking for first '}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'0,82840', event: 'starting', value:' 50'}
start 50 expecting to take 3686
stop  10: 2442.566ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'2,445532320', event: 'ending/next', value:' 10'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'2,445671245', event: 'starting', value:' 60'}
start 60 expecting to take 4210
stop  30: 2883.590ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'2,885258026', event: 'ending/next', value:' 30'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'2,885691943', event: 'starting', value:' 70'}
start 70 expecting to take 4283
stop  20: 3611.561ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'3,612048365', event: 'ending/next', value:' 20'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'3,612151050', event: 'starting', value:' 80'}
start 80 expecting to take 3960
stop  50: 3686.255ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'3,686604276', event: 'ending/next', value:' 50'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'3,686682518', event: 'starting', value:' 90'}
start 90 expecting to take 3349
stop  40: 4497.284ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'4,497598329', event: 'ending/next', value:' 40'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'4,497673581', event: 'starting', value:' 100'}
start 100 expecting to take 2215
stop  60: 4214.282ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'6,660369948', event: 'ending/next', value:' 60'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'6,660509683', event: 'starting', value:' 110'}
start 110 expecting to take 2133
stop  100: 2216.248ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'6,714199881', event: 'ending/next', value:' 100'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'6,714290982', event: 'starting', value:' 120'}
start 120 expecting to take 4390
stop  90: 3352.460ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'7,39403927', event: 'ending/next', value:' 90'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'7,39469828', event: 'starting', value:' 130'}
start 130 expecting to take 4979
stop  70: 4285.245ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'7,171343486', event: 'ending/next', value:' 70'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'7,171479174', event: 'starting', value:' 140'}
start 140 expecting to take 3710
stop  80: 3963.032ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'7,575604833', event: 'ending/next', value:' 80'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'7,575750841', event: 'starting', value:' 150'}
start 150 expecting to take 2744
stop  110: 2133.248ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'8,794234102', event: 'ending/next', value:' 110'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'8,794378890', event: 'starting', value:' 160'}
start 160 expecting to take 4429
stop  150: 2746.237ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'10,322319219', event: 'ending/next', value:' 150'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'10,322390166', event: 'starting', value:' 170'}
start 170 expecting to take 3706
stop  140: 3710.244ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'10,882026939', event: 'ending/next', value:' 140'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'10,882087033', event: 'starting', value:' 180'}
start 180 expecting to take 2475
stop  120: 4391.874ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'11,106455004', event: 'ending/next', value:' 120'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'11,106540831', event: 'starting', value:' 190'}
start 190 expecting to take 3592
stop  130: 4980.057ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'12,19704877', event: 'ending/next', value:' 130'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'12,19755390', event: 'starting', value:' 200'}
start 200 expecting to take 4229
stop  160: 4429.071ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'13,223731139', event: 'ending/next', value:' 160'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'13,223796726', event: 'tasks exhausted'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'13,223846271', event: 'finished'}
worker 0 complete
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'13,221035639', event: 'tasks exhausted'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'13,220764960', event: 'tasks exhausted'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'13,220577889', event: 'tasks exhausted'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'13,220420585', event: 'tasks exhausted'}
stop  180: 2476.131ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'13,358533569', event: 'ending/next', value:' 180'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'13,358622798', event: 'finished', value:' 180'}
worker 2 complete
stop  170: 3707.192ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'14,29777617', event: 'ending/next', value:' 170'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'14,29829570', event: 'finished', value:' 170'}
worker 1 complete
stop  190: 3595.368ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'14,702740279', event: 'ending/next', value:' 190'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'14,702915802', event: 'finished', value:' 190'}
worker 3 complete
stop  200: 4230.908ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'16,250910924', event: 'ending/next', value:' 200'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'16,251002813', event: 'finished', value:' 200'}
worker 4 complete

Thanks to

https://blog.krawaller.se/posts/explaining-callbags-via-typescript-definitions/

1.0.2

3 years ago

1.0.1

4 years ago

1.0.0

4 years ago