1.0.2 • Published 6 years ago

async-task-queue2 v1.0.2

Weekly downloads
2
License
MIT
Repository
github
Last release
6 years ago

AsyncTaskQueue2

通常我们会使用 Promise.all 来执行异步任务队列(并发操作),但是它有一个致命的如果某个任务 reject 了会导致整个 Promise.all 立刻 reject。 AsyncTaskQueue2 是一个异步任务队列类,主要用于处理异步任务,它解决 Promise.all 的疼点。并且基于事件驱动,提供串行与并行两种方式。

PS:Nodejs v7.0+

安装

npm install async-task-queue2

API

  • constructor 构造函数接收一个数组用于初始化队列
  • run(options) options 配置项。
    • async 默认每个项之间不需要等待前一个完成。
    • promise 默认使用原生 Promise。
  • add(promise) 增加一个任务
  • size() 获取队列大小。
  • get(index) 根据索引获取 item。
  • isRun() 返回布尔值,指明队列是否在运行中。

  • on('start') 开始时,触发事件。

  • on('success') 每个任务项执行成功时,触发事件。
  • on('error') 某个任务项执行错误时,触发事件。
  • on('add') 添加某个任务时,触发事件。
  • on('complete') 整个任务队列执行完成时,触发事件。回调参数是 (successs,errors)

返回结果信息的结构:

{
    func: '',    // 执行的任务
    result: '',  // 结果数据
    index: '',   // 索引
}

返回错误信息的结构:

{
    func: '',    // 执行的任务
    error: '',   // 错误数据
    index: '',   // 索引
}

基本使用

const AsyncTaskQueue = require('./');
const tasks = [
    () => new Promise(rs => setTimeout(() => rs(true), 300)),
    () => new Promise(rs => setTimeout(() => rs(true), 200)),
    () => new Promise(rs => setTimeout(() => rs(true), 100)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
];

new AsyncTaskQueue(tasks)
    .on('start', _ => {
        // console.log('start');
    })
    .on('add', (...arg) => {
        // console.log(arg);
    })
    .on('success', (...arg) => {
        // console.log(arg);
    })
    .on('error', (...arg) => {
        // console.log(arg);
    })
    .on('complete', (ress, errs) => {
        console.log(JSON.stringify(ress, null, 3));
        console.log(JSON.stringify(errs, null, 3));
    })
    .run()
;

Promise

AsyncTaskQueue 是基于事件驱动的,不过你可以把它定义为 Promise。

const AsyncTaskQueue = require('./');
const tasks = [
    () => new Promise(rs => setTimeout(() => rs(true), 300)),
    () => new Promise(rs => setTimeout(() => rs(true), 200)),
    () => new Promise(rs => setTimeout(() => rs(true), 100)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
];

function All(tasks) {
    return new Promise((resolve, reject) => (
        new AsyncTaskQueue(tasks)
            .on('complete', (ress, errs) => {
                return resolve([ress, errs]);
            })
            .run()
    ));
}

(async () => {
    const [ress, errs] = await All(tasks);
    console.log(JSON.stringify(ress, null, 3));
    console.log(JSON.stringify(errs, null, 3));
})();

串行与并行

AsyncTaskQueue 提供串行与并行的执行顺序,实际上它们都是异步进行的。 串行是指每个任务项都需要等前一个完成,并行是指它们不需要等待别人。

const AsyncTaskQueue = require('./');
const tasks = [
    () => new Promise(rs => setTimeout(() => rs(true), 300)),
    () => new Promise(rs => setTimeout(() => rs(true), 200)),
    () => new Promise(rs => setTimeout(() => rs(true), 100)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise((rs, rj) => setTimeout(() => rj(false), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
    () => new Promise(rs => setTimeout(() => rs(true), 1000)),
];
// 并行
new AsyncTaskQueue(tasks)
    .on('complete', (ress, errs) => {
        console.log(JSON.stringify(ress, null, 3));
        console.log(JSON.stringify(errs, null, 3));
    })
    .run({ async: true })
;
// 串行
new AsyncTaskQueue(tasks)
    .on('complete', (ress, errs) => {
        console.log(JSON.stringify(ress, null, 3));
        console.log(JSON.stringify(errs, null, 3));
    })
    .run({ async: !true })
;

动态添加任务

AsyncTaskQueue 支持在运行期间可以动态添加任务。一旦运行完成后(触发 complete 事件),就会重置整个队列为初始化状态(空)。

const AsyncTaskQueue = require('./');
const tasks = [
    () => new Promise(rs => setTimeout(() => rs(true), 100)),
];

const atq = new AsyncTaskQueue(tasks);
const start = Date.now();

atq
    .on('success', (...arg) => {
        // 完成一个任务后,立刻添加一个新的任务到异步任务队列里。
        console.log(arg[2] + ', ' + (Date.now() - start) + ' ms');
        if (atq.size() < 15) {
            atq.add(() => new Promise(rs => setTimeout(() => rs(true), 100)));
        }
    })
    .on('add', (...arg) => {
        console.log('当前有 ' + atq.size() + ' 个');
    })
    .on('complete', (ress, errs) => {
        console.log('全部完成了', atq.size());   // 15
        setTimeout(() => {
            console.log(atq.size())             // 0
        });
    })
    .run()
;

多维度嵌套

AsyncTaskQueue 支持多维度嵌套,你可以尽情的嵌套使用。

const AsyncTaskQueue = require('./');

function All(tasks, id = 0) {
    console.time('run ' + id);
    return new Promise((resolve, reject) => (
        new AsyncTaskQueue(tasks)
            .on('complete', (ress, errs) => {
                console.timeEnd('run ' + id);
                return resolve([ress, errs]);
            })
            .run()
    ));
}

const tasks = [...new Array(10)].map(i =>
    new Promise(rs => setTimeout(() => rs(true), 1000))
);

All([
    All(tasks, 1).then(_ => console.log('async task queue 1 完成了')),
    All(tasks, 2).then(_ => console.log('async task queue 2 完成了')),
    All(tasks, 3).then(_ => console.log('async task queue 3 完成了')),
    All(tasks, 4).then(_ => console.log('async task queue 4 完成了')),
    All(tasks, 5).then(_ => console.log('async task queue 5 完成了')),
]).then(res => {
    console.log('全部 async task queue 完成了');
});
1.0.2

6 years ago

1.0.1

6 years ago

1.0.0

6 years ago