1.0.1 • Published 4 months ago

cascade-q v1.0.1

Weekly downloads
-
License
MIT
Repository
github
Last release
4 months ago

简体中文 | English

CascadeQ

CascadeQ 是一个多优先级异步任务调度器,专为需要精细任务优先级控制和并发管理的 JavaScript/TypeScript 应用设计。

特性

  • 多级优先级队列 - 根据阈值自动分配任务到不同优先级队列
  • 优先级衰减机制 - 任务优先级会随时间自动提升,避免低优先级任务无限期等待
  • 灵活并发控制 - 全局并发限制和队列级别并发配置
  • 事件系统 - 完整的任务生命周期事件
  • 任务管理 - 支持任务取消、暂停/恢复和超时清理
  • 强类型支持 - 完整的 TypeScript 类型定义

安装

npm install cascade-q

基本用法

import { CascadeQ } from 'cascade-q';
import type { TaskHandle } from 'cascade-q/types';

// 创建一个多优先级队列实例
const queue = new CascadeQ({
  thresholds: [0, 10], // 两个优先级级别:高(<=0)和低(<=10)
  maxConcurrency: 3 // 最大同时运行3个任务
});

// 添加高优先级任务
queue.add(async (): Promise<unknown> => {
  const response = await fetch('/api/important-data');
  return response.json();
}, 0); // 优先级0(高)

// 添加低优先级任务并获取类型化的句柄
const handle: TaskHandle = queue.add(async (): Promise<unknown> => {
  const response = await fetch('/api/less-important-data');
  return response.json();
}, 5); // 优先级5(低)

// 等待任务完成并获取结果
const result = await queue.add(async () => {
  return await fetchData();
});

// 或使用Promise链
queue
  .add(async () => fetchData())
  .then(result => console.log(result))
  .catch(error => console.error(error));

// 取消特定任务
handle.cancel();
import { CascadeQ } from 'cascade-q';
import type { CascadeQState } from 'cascade-q/types';

// 带有命名的三级优先级队列
const queue = new CascadeQ({
  thresholds: [
    { value: 0, level: 'critical' }, // 关键任务
    { value: 10, level: 'normal' }, // 普通任务
    { value: 20, level: 'background' } // 后台任务
  ]
});

// 获取命名队列的状态
const state: CascadeQState = queue.getState();
const criticalQueueState = state.queues.find(q => q.level === 'critical');

核心概念

优先级阈值

优先级阈值定义了任务根据优先级值被分配到哪个队列:

// 示例:三级优先级队列
const queue = new CascadeQ({
  thresholds: [0, 10, 20]
});

// 优先级 <= 0 的任务进入第一个队列(最高优先级)
// 优先级 <= 10 的任务进入第二个队列
// 优先级 <= 20 的任务进入第三个队列

优先级衰减

任务优先级会随时间自动提升(数值降低):

import { CascadeQ } from 'cascade-q';
import type { DecayCurve } from 'cascade-q/types';

// 配置任务优先级如何随时间衰减
const queue = new CascadeQ({
  baseDecay: 0.5, // 每单位时间优先级提升0.5
  decayCurve: (n: number): number => n, // 线性衰减
  priorityDecayInterval: 60000 // 每分钟计算一次衰减
});

// 指数衰减(优先级提升加速)示例
const exponentialDecay: DecayCurve = (n: number): number => Math.pow(n, 2);
const queue2 = new CascadeQ({
  decayCurve: exponentialDecay
});

并发控制机制

CascadeQ 提供两层级的并发管理:

  • 全局并发控制 - 通过 maxConcurrency 限制总并发数
  • 队列级并发分配 - 通过 calcConcurrency 在不同优先级队列间分配并发额度
import { CascadeQ } from 'cascade-q';
import type { CalcConcurrency, CascadeQState } from 'cascade-q/types';

// 自定义并发分配策略
// *注意:这是简化示例,实际应用中应考虑总并发限制和更多队列的情况
const customConcurrencyStrategy: CalcConcurrency = (index: number, state: CascadeQState): number => {
  if (state.pending === 0 || state.queues[index].pending === 0) return 0; // 正确
  // 高优先级队列(index=0)获得更多并发额度
  if (index === 0) return Math.min(8, state.queues[0].pending);
  // 低优先级队列每个最多2个并发
  return Math.min(2, state.queues[index].pending);
};

const queue = new CascadeQ({
  thresholds: [0, 10]
  maxConcurrency: 10, // 全局最大并发数
  calcConcurrency: customConcurrencyStrategy
});

默认并发策略

CascadeQ 的默认并发分配策略采用"两阶段加权分配"原则,通过队列优先级和任务数量动态调整并发资源,确保系统资源高效利用:

// 默认并发计算策略
const DEFAULT_CALC_CONCURRENCY: CalcConcurrency = (index: number, { max, pending, queues }: CascadeQState): number => {
  // 第一阶段: 基础计算
  if (pending === 0 || queues[index].pending === 0) return 0;

  const totalLevels = queues.length;
  const levelWeight = (totalLevels - index) / totalLevels;
  let levelShare = Math.ceil((max * levelWeight * queues[index].pending) / pending);
  levelShare = Math.min(levelShare, queues[index].pending);

  // 第二阶段: 确保高优先级队列获得足够资源
  if (index === 0 && levelShare < queues[0].pending) {
    levelShare = Math.min(Math.ceil(max * 0.6), queues[0].pending);
  }

  return levelShare;
};

策略特点

  • 优先级加权 - 优先级越高的队列获得更高权重,如三级队列中最高优先级队列权重为100%,中等优先级队列权重为66.7%,低优先级队列权重为33.3%
  • 按需分配 - 考虑各队列任务数量,根据任务分布按比例分配并发资源
  • 高优先级保障 - 确保高优先级队列获得至少60%的并发资源(如有需要)
  • 动态调整 - 随着任务执行和添加,自动重新计算最优并发分配
  • 资源利用最大化 - 使用向上取整而非向下取整,确保并发资源得到充分利用
  • 精细控制 - 精确计算每个队列的实际需求,不强制分配不需要的并发额度

配置选项

选项类型默认值描述
maxConcurrencynumber10最大并发任务数
thresholdsArray<number\|ThresholdOption>[0, 10]优先级队列的阈值配置 完整配置对象
baseDecaynumber0.5基础优先级衰减率
decayCurveDecayCurven => n优先级衰减曲线函数
priorityDecayIntervalnumber60000优先级衰减计算间隔(毫秒)
calcConcurrencyCalcConcurrency默认并发策略队列并发额度分配算法
taskTTLnumber60000任务最大生存时间(毫秒)
cleanupIntervalnumber60000过期任务清理间隔(毫秒)
priorityCheckIntervalnumber10000优先级检查间隔(毫秒)

ThresholdOption

属性类型描述
valuenumber优先级阈值,任务的 basePriority ≤ value 将被分配到对应队列
levelstring\|number可选的队列名称,用于标识队列,便于状态查询和日志记录

API 参考

队列操作方法

方法参数返回值描述
add<T>task: () => Promise<T>, priority?: numberTaskHandle<T>添加异步任务到队列,返回任务控制句柄
pausevoid暂停队列调度,已执行的任务继续运行,新任务不会启动
resumevoid恢复队列调度
canceltaskId: symbolboolean取消特定任务,成功返回 true
clearvoid清空所有待执行任务
getStateCascadeQState获取队列当前状态信息
disposevoid释放队列资源,清理定时器,队列不再可用

事件监听方法

方法参数返回值描述
onevent: QueueEvent, handler: (task: TaskItem, error?: Error) => voidvoid添加事件监听器
offevent: QueueEvent, handler: (task: TaskItem, error?: Error) => voidvoid移除事件监听器

TaskHandle 方法

方法参数返回值描述
cancelboolean取消任务(仅限 pending 状态),成功返回 true
getStatusTaskStatus获取当前任务状态
thenonfulfilled?, onrejected?Promise<unknown>Promise 接口,支持等待任务完成
catchonrejectedPromise<unknown>Promise 接口,捕获任务错误
finallyonfinallyPromise<unknown>Promise 接口,无论任务成功或失败都执行

QueueEvent

事件名称回调参数触发时机
enqueueTaskItem任务被添加到队列时
startTaskItem任务开始执行时
successTaskItem任务成功完成时
failTaskItem, Error任务执行失败时
completeTaskItem任务完成时(无论成功或失败)
cancelTaskItem任务被取消时

状态定义

TaskStatus

状态描述
Pending任务在队列中等待
Running任务正在执行中
Success任务已成功完成
Failed任务执行失败
Cancelled任务已被取消

CascadeQState

属性类型描述
runningnumber当前正在执行的任务总数
pendingnumber当前等待执行的任务总数
maxnumber最大并发任务数
queuesarray各优先级队列的详细状态

state.queues[index]

属性类型描述
levelnumber\|string队列的优先级阈值或标识符
concurrencynumber该队列当前的并发额度
runningnumber该队列中正在执行的任务数
pendingnumber该队列中等待执行的任务数

许可证

MIT