@astodi/embedded-queue v0.0.3
embedded-queue
embedded-queue is job/message queue for any platform. It does not required any other repository for storing data, like Redis, MySQL, and so on.
It currently has the following repository implementations:
- nedb embedded repository,
- vanilla in-memory repository,
- or you can implement your own repository
Installation
npm install embedded-queue or
yarn add embedded-queue Basic Usage
import { Queue, InMemoryJobRepository } from 'embedded-queue'
(async () => {
const queue = await Queue.createQueue(
new InMemoryJobRepository()
// or new NedbJobRepository({ /* you can pass the nedb options here */ })
);
// set up job processor for "adder" type, concurrency is 1
queue.process(
"adder",
async (job) => job.data.a + job.data.b,
1
);
// handle job complete event
queue.on(
EmbeddedQueue.Event.Complete,
(job, result) => {
console.log("Job Completed.");
console.log(` job.id: ${job.id}`);
console.log(` job.type: ${job.type}`);
console.log(` job.data: ${JSON.stringify(job.data)}`);
console.log(` result: ${result}`);
}
);
// create "adder" type job
await queue.createJob({
type: "adder",
data: { a: 1, b: 2 },
});
// shutdown queue
setTimeout(async () => { await queue.shutdown(1000); }, 1);
})();Basic
- Create Queue
- Set Job Processor
- Set Job Event Handler
- Create Job
- Shutdown Queue
Create Queue
You can create a new queue by calling Queue.createQueue(repository). Queue.createQueue returns a Promise, await it for initialize finish.
Set Job Processor
Job processor is a function that process single job. It is called by Worker and pass Job argument, it must return Promise<any>. It runs any process(calculation, network access, etc...) and call resolve(result). Required data can pass by Job.data object. Also you can call Job.setProgress for notify progress, Job.addLog for logging.
You can set any number of job processors, each processor is associate to single job type, it processes only jobs of that type.
If you want to need process many jobs that same type in concurrency, you can launch any number of job processor of same type.
Finally, queue.process method signature is quque.process(type, processor, concurrency).
Set Job Event Handler
Queue implements EventEmitter, when job is completed or job is failed, job progress updated, etc..., you can observe these events by set handlers Queue.on(Event, Handler).
| Event | Description | Handler Signature |
|---|---|---|
Event.Enqueue | Job add to queue | (job) => void |
Event.Start | Job start processing | (job) => void |
Event.Failure | Job process fail | (job, error) => void |
Event.Complete | Job process complete | (job, result) => void |
Event.Remove | Job is removed from queue | (job) => void |
Event.Error | Error has occurred (on outside of job processor) | (error, job?) => void |
Event.Progress | Job progress update | (job, progress) => void |
Event.Log | Job log add | (job, message) => void |
Event.Priority | Job priority change | (job, priority) => void |
Event.Complete event handler is most commonly used, it can receive job result from job processor.
Create Job
You can create a job by calling Queue.createJob(data). data argument is object that contains type, priority and data.
| Field | Type | Description |
|---|---|---|
type | string | Identifier for select job processor |
priority | Priority | Queue picks up job that has high priority first |
data | object | Data that is used by job processor, you can set any data |
Queue.createJob(data) returns Promise<Job> object, this job is associated to Queue.
Priority is any of Priority.LOW, Priority.NORMAL, Priority.MEDIUM, Priority.HIGH, Priority.CRITICAL.
Shutdown Queue
If you want stop processing jobs, you have to call Queue.shutdown(timeoutMilliseconds, type) => Promise<void>. Queue starts to stop running job processor, and all job processors are stopped, Promise is resolved. If stopping job processor takes long time, after timeoutMilliseconds Queue terminate job processor, and set Job.state to State.FAILURE.
You can stop specified type job processor by passing second argument type. If undefined is passed, stop all type job processors.
API
Queue API
createJob(data): Create a newJob, see above for usage.process(type, processor, concurrency): Set job processor, see above for usage.shutdown(timeoutMilliseconds, type): Start shutting downQueue, see above for usage.findJob(id): Search queue byJob.id. If found returnJob, otherwise returnnull.listJobs(state): List all jobs that has specified state. If passedundefinedreturn all jobs.removeJobById(id): Remove aJobfrom queue that specified id.removeJobsByCallback(callback): Remove all jobs thatcallbackreturnstrue. Callback signature is(job) => boolean.
Job API
setProgress(completed, total): Set progress, arguments are convert to percentage value(completed / total).addLog(message): Add log.save(): After call it, job put into associateQueue, and waiting for process by job processor.remove(): Remove job fromQueue, it will not be processed anymore.setPriority(value): Setpriorityvalue.isExist(): ReturnJobis inQueue. Before callingsave()or after callingremove()returnsfalse, otherwisetrue.- Getters
id: String that identifiesJob.type: String that Identifier for select job processor.data: Object that is used by job processor, you can set any data.priority: Number that determines processing order.createdAt: Date that job is created.updatedAt: Date that job is updated.startedAt: Date that job processor start process. Before job start, value isundefined.completedAt: Date that job processor complete process. Before job complete or job failed, value isundefined.failedAt: Date that job processor occurred error. Before job complete or job complete successfully, value isundefined.state: String that represents currentJobstate, any ofState.INACTIVE,State.ACTIVE,STATE.COMPLETE,State.FAILURE.duration: Number that processing time ofJobin milliseconds. Before job complete or job failed, value isundefined.progress: Number thatJobprogress in percentage. You can set value by callingJob.setProgress. When job complete, set 100 automatically.logs: Array of String. You can add log by callingJob.addLog.
Advanced
Unexpectedly Termination
If your program suddenly terminated without calling Queue.shutdown while your processor was processing jobs. These jobs remain State.ACTIVE in queue. When next time Queue.createQueue is called, these jobs are updated to State.FAILURE automatically.
If you want reprocessing these jobs, please call Queue.createJob with same parameter.
License
MIT