1.0.1 • Published 3 years ago

@simplyhexagonal/recurring-task-queue v1.0.1

Weekly downloads
-
License
Apache-2.0
Repository
github
Last release
3 years ago

Recurring Task Queue (RTQ)

Tests Compatible with Typescript versions 4+ Compatible with Node versions 14+

Compatible with Chrome versions 60+ Compatible with Firefox versions 60+ Compatible with Safari versions 12+ Compatible with Edge versions 18+

Versatile type-safe queueing library for a finite set of recurring user-editable tasks.

npm.io

Open source notice

This project is open to updates by its users, I ensure that PRs are relevant to the community. In other words, if you find a bug or want a new feature, please help us by becoming one of the contributors ✌️ ! See the contributing section

Like this module? ❤

Please consider:

Abstract

Let's say you have been put in charge of developing a recurring task which:

  • runs every five minutes,
  • loads all images uploaded in the past 5 minutes into memory and processes them
  • has the ability to edit the path to the location of the images
  • displays the status of each 5 minute run to measure performance and catch any failed attempts

The first two items on the list are pretty easy to solve for by setting up a CloudWatch Event that every 5 minutes calls an end-point on your REST API which will perform the task.

In regards to the other two items your intuition dictates that to be able to allow the users to edit the path to the location of the images, you should store the task definitions on the app's database, which would also make sense to store the status and maybe even a log for each task run.

You also want to make sure that, in the future, when multiple recurring tasks are defined, they can be queued and processed without any additional development.

With the previous in mind you end up with a diagram similar to this:

npm.io

Now you can specify the following sub-requirements to complete the task:

  • define the structure of task definitions which will be stored
  • track and store the status of each task
  • make sure that no matter how many times the REST end-point is called, only one instance of the task will run at a time
  • there needs to be proper error handling all the way to avoid having a situation where the app dies every 5 minutes due to an unforeseen error

Some nice-to-haves would be:

  • having the tasks retry if they fail
  • be able to set a max number of retries
  • when more than one task is defined, on each call to the end-point any task that has completed will run again, and any task still running will be left as is
  • if a task depends on a third-party API with strict rate limits, you can specify in the task definition a wait period between runs to avoid hitting said rate limit
  • be able to send notifications when a task reaches the maximum number of retries and is flagged as FAILED

The good news is, recurring-task-queue (RTQ) handles all of the above for you!

Setup

Install:

pnpm i @simplyhexagonal/recurring-task-queue

# or
yarn add @simplyhexagonal/recurring-task-queue

# or
npm install @simplyhexagonal/recurring-task-queue

Define a task handler:

import { RTQTaskHandler } from '@simplyhexagonal/recurring-task-queue';

interface imgProcTaskOptions {
  imgLocation: string;
}

const imgProcTaskHandler: RTQTaskHandler<imgProcTaskOptions>  = async (taskOptions) => {
  const rawImages = await loadImages(taskOptions.imgLocation);

  return await processImages(rawImages);
}

Store a task in a data source:

import {
  RTQTask,
  RTQStatus,
} from '@simplyhexagonal/recurring-task-queue';

// NOTE: in real-world scenarios this object would be
// generated from user input
const imgProcTaskDefinition: RTQTask<imgProcTaskOptions> = {
    id: uid(),
    status: RTQStatus.NEW,
    waitTimeBetweenRuns: 200,
    taskName: 'Image Processing',
    maxRetries: 1,
    retryCount: 0,
    // since the task has never run, simply set 
    // the lastRun date to 1970-01-01T00:00:00.000Z
    lastRun: new Date(0),
    taskOptions: {
      imgLocation: 'some/image/location/path',
    },
};

// This would be your custom function which handles
// saving tasks in your data source
createTask(imgProcTaskDefinition);

Now you will need to instantiate RTQ with the appropriate options to access your stored task/queue data, the task handler you defined, an event handler, and your custom error handling:

import RTQ, {
  RTQOptions,
  RTQTask,
  RTQQueueEntry,
  RTQTaskHandler,
} from '@simplyhexagonal/recurring-task-queue';

const options = RTQOptions {
  fetchTasks: async () => { /* return RTQTask<imgProcTaskOptions>[] */ },
  updateTask: async (task: RTQTask<imgProcTaskOptions>) => { /* ... */},
  createQueueEntry: async (queueEntry: RTQQueueEntry) => { /* ... */},
  fetchQueueEntries: async () => { /* return RTQQueueEntry[] */ },
  removeQueueEntry: async (queueEntry: RTQQueueEntry) => { /* return RTQQueueEntry[] */ },
  taskHandlers: [
    imgProcTaskHandler,
  ],
  eventHandler: async (event: RTQEvent) => { /* ... */ },
  errorHandler: (error: any) => { /* ... */ },
  maxConcurrentTasks: 10, // leave undefined to have no limit
}

const recurring = new RTQ(options);

Ticking

Based on the setup described in the previous section we ended up with the following RTQ instance:

const recurring = new RTQ(options);

It is important to remember that RTQ handles a queue of tasks which it processes using the task handlers you define, nothing more. As such, to begin queuing and processing the tasks your must run RTQ's tick() method.

We do NOT recomend using loops or intervals within your app to tick your task queue, but rather set an end-point which can be periodically called by another process, for example:

const server = fastify();

server.route({
  method: 'GET',
  url: '/api/process-images',
  handler: async () => {
    recurring.tick(); // <= This is where the magic happens

    return 'Processing images...';
  },
});

Event handling

There are two types of events defined by the actions RTQ performs on tasks and the queue:

enum RTQAction {
  MODIFY_TASK_STATUS = 'MODIFY_TASK_STATUS',
  MODIFY_QUEUE = 'MODIFY_QUEUE',
}

The event itself carries a lot more information about the action performed:

interface RTQEvent {
  timestamp: Date;
  action: RTQActionEnum;
  message: string;
  reason: string;
  additionalData: {[k: string]: any};
  triggeredBy: string;
}

The additionalData varies depending on the action:

// if (action === RTQAction.MODIFY_TASK_STATUS)
additionalData = {
  taskId,
  taskName,
  prevStatus,
  status,
}

// if (action === RTQAction.MODIFY_QUEUE)
additionalData = {
  id,
  taskId,
  queuedAt,
}

So, for example let's say you want to send a notification if a task's status changes to FAILED, then you would define your event handler like this:

const eventHandler = async (event: RTQEvent) => {
  const {
    action,
    additionalData,
  } = event;

  if (
    action === RTQAction.MODIFY_TASK_STATUS
    && additionalData.status === RTQStatus.FAILED
  ) {
    makeSureTowelIsAtHand();
    dontPanic();
    notifyEveryLastOneOfUs();
  }
}

WIP

  • Documentation

In the mean-time you can see more detailed use cases in the examples and the jest tests.

Development

pnpm
pnpm dev
pnpm test
pnpm build
pnpm release

Contributing

Yes, thank you! This plugin is community-driven, most of its features are from different authors. Please update the tests and don't forget to add your name to the package.json file.

Contributors ✨

Thanks goes to these wonderful people (emoji key):

License

Copyright (c) 2021-Present RTQ Contributors. Licensed under the Apache License 2.0.