@blackglory/consumer v3.0.1
- @blackglory/consumer ** Install #+BEGIN_SRC sh
Install as a library
npm install @blackglory/consumer
or
yarn add @blackglory/consumer
Install as a CLI program
npm install --global @blackglory/consumer
or
yarn global add @blackglory/consumer #+END_SRC
** API #+BEGIN_SRC typescript import { Runner, IRunnable } from 'extra-runnable'
type RunnableConsumer = IRunnable<void, Params>
type RunnableConsumerFactory = () => Awaitable<RunnableConsumer>
type Consumer = (signal: AbortSignal, params: Params) => Awaitable
interface IConsumerModule { /**
- 被执行的消费者本身.
- 消费者结束的定义:
- 返回非PromiseLike值
- 返回PromiseLike值, 且PromiseLike到达resolved或rejected状态.
- 抛出错误.
- 当消费者结束并非主动造成时, 会根据情况采取不同的重启方式:
- 如果结束是因为函数返回, 则会立即重启任务.
- 如果结束是因为抛出错误, 则会根据指数退避策略重启任务.
@param {AbortSignal} signal 当要求结束任务时, 该参数会发出中止信号, 任务此时需要自行让消费者结束. */ default: Consumer
/**
初始化函数, 返回值是default函数的params.
该函数返回后, 任务才会启动. */ init?: () => Awaitable
/**
一个钩子, 当程序退出时, 会调用它执行清理工作.
- 只有在成功执行init之后, 才有可能调用final, 如果程序在init之前就退出, 则final不会被调用.
- @param {error} 如果导致程序退出的原因是错误, 则提供此参数. */ final?: (error?: Error) => Awaitable }
interface IAPI { getId(): string setLabel(val: string): null getLabel(): string getState(): OrchestratorState getConcurrency(): number
/*
, 调整并发数, 这会导致任务被创建启动或关闭销毁.
, 将并发数设为0会关闭销毁所有任务, 但不会导致程序退出.
,
, 当concurrency是一个字符串时, 支持以下格式:
, - n
, 整数的字符串表示.
, - max
, 最大逻辑核心数, 相当于 100%
和 1/1
.
, - half
, 一半逻辑核心数, 相当于 50%
和 1/2
.
, - -n
, 最大逻辑核心数减去n.
, - n/m
, 按分数分配逻辑核心数.
, - n%
, 按百分比分配逻辑核心数.
, 除 0
, 0/m
, 0%
外, 其他非整数情况都会向上取整.
,*/
scale(concurrency: number | string): null
terminate(): null } #+END_SRC
*** Orchestrator #+BEGIN_SRC typescript export class Orchestrator extends Emitter<{ terminated: []
/**
用于在出错时提供错误信息, 主要为打印或记录日志而设计, 也可以用来在出错时停止Daemon. */ error: error: Error }> { constructor( createRunnableConsumer: RunnableConsumerFactory , params: Params )
getState(): OrchestratorState getNumberOfInstances(): number terminate(): Promise scale(target: number): Promise } #+END_SRC
* Adapters ** RunnableConsumerFromFunction #+BEGIN_SRC typescript class RunnableConsumerFromFunction implements RunnableConsumer { constructor(fn: Consumer) } #+END_SRC
** RunnableConsumerFromModule #+BEGIN_SRC typescript class RunnableConsumerFromModule implements RunnableConsumer { /
- @param filename export
IConsumerModule<Params>
*/ constructor(filename: string) } #+END_SRC
** RunnableConsumerFromModuleAsThread #+BEGIN_SRC typescript class RunnableConsumerFromModuleAsThread implements RunnableConsumer { /
- @param filename export
IConsumerModule<Params>
*/ constructor(filename: string) } #+END_SRC
** RunnableConsumerFromModuleAsProcess #+BEGIN_SRC typescript class RunnableConsumerFromModuleAsProcess implements RunnableConsumer { /
- @param filename export
IConsumerModule<Params>
*/ constructor(filename: string) } #+END_SRC
*** API #+BEGIN_SRC typescript class API implements ImplementationOf { constructor( orchestrator: Orchestrator , options: { id: string label: string } ) } #+END_SRC
CLI * =run-consumer-module ...options = 将 =IConsumerModule= 作为一个Orchestrator运行.
#+BEGIN_SRC Usage: run-consumer-module options
Options: -V, --version output the version number --id --label --mode (default: "async") --concurrency (default: "1") --port --registry -h, --help display help for command #+END_SRC
**** =--id string= (可选, 默认为随机生成的UUID) 该Orchestrator的id.
**** =--label string= (可选) 该Orchestrator的label.
**** =--mode = (可选, 默认为 =async=)
- =async=: 适用于I/O密集型的任务, 如果脚本包含CPU密集型代码, 则会导致阻塞.
- =thread=: 适用于CPU密集型的任务, 采用线程模型, 使用worker_threads模块. init函数的返回值必须是可序列化的.
- =process=: 适用于CPU密集型的任务, 采用进程模型, 使用child_process模块. init函数的返回值必须是可序列化的. 该模式是为了弥补thread模式下Worker无法使用部分Native模块的不足而出现的. 由于thread模式的资源开销更小, 应优先使用thread模式.
**** =--concurrency <number | string>= concurrency(可选, 默认为1) 该Orchestrator的并发数.
当concurrency是一个字符串时, 支持以下格式:
- =n=, 整数的字符串表示.
- =max=, 最大逻辑核心数, 相当于 =100%= 和 =1/1=.
- =half=, 一半逻辑核心数, 相当于 =50%= 和 =1/2=.
- =-n=, 最大逻辑核心数减去n.
- =n/m=, 按分数分配逻辑核心数.
- =n%=, 按百分比分配逻辑核心数. 除 =0=, =0/m=, =0%= 外, 其他非整数情况都会向上取整.
**** =--port = (可选) RPC服务器的端口号. 若提供此项, 则会开启RPC服务器.
**** =--registry = (可选) 需要连接的远程registry地址, 例如 =ws://localhost:8080=.