0.2.0 • Published 2 years ago

concurrent-runner v0.2.0

Weekly downloads
-
License
MIT
Repository
github
Last release
2 years ago

concurrent-runner

NPM version NPM downloads Build Status

run cancelable async function concurrently by concurrency and priority using heap

API

export interface Task<T = any> {
    run: () => {
        promise: Promise<T>;
        cancel?: () => void;
    };
}

export type ExtractTaskResult<T> = T extends Task<infer U> ? U : never;

// -1 means higher priority
export type Comparator<T extends Task = Task> = (t1: T, t2: T) => -1 | 0 | 1;

export type TaskHandle<T> = {
    promise: Promise<T>;
    cancel: () => void;
};

export interface RunnerOptions<T extends Task = Task> {
    concurrency: number;
    comparator: Comparator<T>;
    onEmpty?: () => void;
    onTaskStart?: (o: {
        task: T;
    }) => void;
    onTaskEnd?: (o: {
        task: T;
        result: any;
    }) => void;
}

export default class CocurrentRunner<T extends Task> {
    private options;
    private heap;
    private running;
    private started;
    private paused;
    constructor(options: RunnerOptions<T>);
    setOptions(options: Partial<RunnerOptions<T>>): void;
    start(): void;
    pause(): void;
    resume(): void;
    stop(): void;
    private endAndSchedule;
    private checkAndSchedule;
    addTask<TT extends T, R = ExtractTaskResult<TT>>(task: TT): TaskHandle<R>;
}

Usage

import CoRunner, { Task } from 'concurrent-runner';

describe("concurrent runner", () => {
  interface TimeTask<U = any> extends Task<U> {
    time: number;
  }
  
  function timeout(d: number): Promise<number> {
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve(d);
      }, d);
    });
  }
  
  function eq(r1: number, r3: number) {
    return Math.abs(r1 - r3) < 20;
  }

  function getRunner() {
    const r = new CoRunner<TimeTask>({
      concurrency: 2,
      comparator(t: TimeTask, t2: TimeTask) {
        return t.time === t2.time ? 0 : t.time > t2.time ? 1 : -1;
      },
    });
    return r;
  }
  function getRunnerWithTasks() {
    const r = getRunner();
    const ret: number[][] = [];
    const taskHandles = [];
    const times = [300, 100, 500, 100];
    for (let time = 0; time < times.length; time++) {
      const p = r.addTask({
        run() {
          return {
            promise: timeout(times[time]),
          };
        },
        time,
      });
      taskHandles.push(p);
      p.promise.then(
        (r) => {
          ret.push([time, r]);
        },
        (r) => {
          ret.push([time, r]);
        }
      );
    }
    return { r, ret, taskHandles };
  }

  it("works for concurrency", (done) => {
    const { r, ret } = getRunnerWithTasks();
    const startTime: number[][] = [];
    const start = Date.now();

    r.setOptions({
      onTaskStart(info) {
        startTime.push([info.task.time, Date.now() - start]);
      },
      onEmpty() {
        expect(eq(startTime[2][1], 100)).toBe(true);
        expect(eq(startTime[3][1], 300)).toBe(true);
        expect(startTime.map((r) => r[0])).toEqual([0, 1, 2, 3]);
        expect(ret.map((r) => r[0])).toEqual([1, 0, 3, 2]);

        done();
      },
    });

    r.start();
  });

  it("can cancel running", (done) => {
    const { r, ret, taskHandles } = getRunnerWithTasks();
    const startTime: number[][] = [];
    const start = Date.now();

    setTimeout(() => {
      taskHandles[2].cancel();
    }, 200);

    r.setOptions({
      onTaskStart(info) {
        startTime.push([info.task.time, Date.now() - start]);
      },
      onEmpty() {
        expect(eq(startTime[2][1], 100)).toBe(true);
        expect(eq(startTime[3][1], 200)).toBe(true);
        expect(startTime.map((r) => r[0])).toEqual([0, 1, 2, 3]);
        expect(ret.map((r) => r[0])).toEqual([1, 2, 0, 3]);
        expect((ret[1][1] as any).name).toEqual("ConcurrentRunnerAbortError");
        done();
      },
    });

    r.start();
  });

  it("can cancel waiting", (done) => {
    const { r, ret, taskHandles } = getRunnerWithTasks();
    const startTime: number[][] = [];
    const start = Date.now();

    setTimeout(() => {
      taskHandles[3].cancel();
    }, 200);

    r.setOptions({
      onTaskStart(info) {
        startTime.push([info.task.time, Date.now() - start]);
      },
      onEmpty() {
        expect(eq(startTime[2][1], 100)).toBe(true);
        expect(startTime.map((r) => r[0])).toEqual([0, 1, 2]);
        expect(ret.map((r) => r[0])).toEqual([1, 3, 0, 2]);
        expect((ret[1][1] as any).name).toEqual("ConcurrentRunnerAbortError");
        done();
      },
    });

    r.start();
  });

  function runWithCancel(fn: (...args: any) => Generator, ...args: any[]): { promise: Promise<any>; cancel: () => void; } {
    const gen = fn(...args);
    let cancelled: boolean, cancel: () => void = function () { };
    const promise = new Promise((resolve, reject) => {
      cancel = () => {
        cancelled = true;
      };

      onFulfilled();

      function onFulfilled(res?: any) {
        if (!cancelled) {
          let result: any;
          try {
            result = gen.next(res);
          } catch (e) {
            return reject(e);
          }
          next(result);
          return null;
        }
      }

      function onRejected(err: any) {
        var result: any;
        try {
          result = gen.throw(err);
        } catch (e) {
          return reject(e);
        }
        next(result);
      }

      function next({ done, value }: any) {
        if (done) {
          return resolve(value);
        }
        return value.then(onFulfilled, onRejected);
      }
    });

    return { promise, cancel };
  }

  function getCancelableRunner() {
    const r = getRunner();
    const ret: any[] = [];
    const ret2: any[] = [];

    const p = r.addTask({
      run() {
        return runWithCancel(function* r() {
          ret2.push(1);
          yield timeout(200);
          ret2.push(2);
          yield timeout(300);
          ret2.push(3);
          return 4;
        });
      },
      time: 1,
    });
    p.promise.then(
      (r) => {
        ret.push(r);
      },
      (r) => {
        ret.push(r);
      }
    );
    return { r, p,ret, ret2 };
  }

  it('do not call cancel', (done) => {
    const { r, ret, ret2 } = getCancelableRunner();
    r.setOptions({
      onEmpty() {
        expect(ret).toEqual([4]);
        expect(ret2).toEqual([1,2,3]);
        done();
      }
    })
    r.start();
  });

  it('call task cancel', (done) => {
    const { r,p, ret, ret2 } = getCancelableRunner();
    r.setOptions({
      onEmpty() {
        expect(ret.length).toEqual(1);
        expect(ret[0].name).toEqual('ConcurrentRunnerAbortError');
        expect(ret2).toEqual([1,2]);
        done();
      }
    })
    r.start();
    setTimeout(() => {
      p.cancel();
    }, 300);
  });
});

HISTORY

0.2.0 - 2022/02/10

  • change CancelablePromise return type to TaskHandle