1.0.0 • Published 4 years ago

jolt-retry-queue v1.0.0

Weekly downloads
-
License
ISC
Repository
-
Last release
4 years ago

@jolt-us/retry-queue Library README

Architecture diagram for CommandRetryWrapper - message flow

First Steps

see the architecture diagram.

Our first usage with RabbitMQ provide auto-retry mechanism for async commands.

for example → assume we have a command called saySomething and assume this command has a good potential of failure.

By now, in case of failure, for any reason, we had to find out (manually) these kind of failures and handle it one by one.

Now, we can wrap this command with CommandRetryWrapper and configure auto-retry mechanism. in case of failure:

  • saySomething failed on first execute
  • after 10s → retry sending invoice
  • if failed again → after 10m → retry sending invoice
  • and so on...

we don't want keep doing it endlessly

  • → after the amount of retries we've configured
  • → the message is dead → we run onDeadMessage handler we've got as parameter

    handling example (implementation details):

    • → save a record on dedicated DB table so we know what & when failed
    • → wait / fix the issue if needed → re-queue that message

How we do that?

this guide written in typescript

TL;DR

// index.ts (/test/example.ts)

import { QueueManager } from "../src/QueueManager/QueueManager";
import { Channel, Message } from "amqplib";
import { CommandRetryWrapper } from "../src/CommandRetryWrapper/CommandRetryWrapper";
import { SaySomething, SaySomethingArgs } from "./SaySomething";

main();

async function main() {
    const queueManager: QueueManager = new QueueManager({
        connection: "amqp://localhost",
        appId: "test",
    });

    const { command } = await CommandRetryWrapper.create({
        queueManager,
        commandName: "say-hello",
        retries: [10000, 60000], // 10s & 1m
        handlers: {
            command: SaySomething.say,
            validate: async () => {},
            onError: async (error, message, tryAttempt) =>
                console.error("ERROR #", tryAttempt),
            onFailure: async (errors, message) =>
                console.error(`Failed after ${errors.length} errors`),
        },
    });

    const message = { sayWhat: `message ${Math.random()}`, sayTo: "Eli" };
    await command(message);

    setTimeout(async () => {
        SaySomething.mouthClosed = false;
    }, 15000);
}

Let's break it down. First, create the SaySomething command we want to wrap. This command is just logging the message provided. The command will reject/resolve based on the mouthClosed static property - we will use it to demonstrate failure and trigger retries.

// SaySomething.ts

export interface SaySomethingArgs {
    sayWhat: string;
    sayTo: string;
    tryAttempt?: number;
}

export class SaySomething {
	  public static mouthClosed = true;
		
		static say(args: SaySomethingArgs): Promise<void> {
        return new Promise((resolve, reject) => {
            if (!SaySomething.mouthClosed) {
                console.log(`${args.sayWhat} ${args.sayTo} :)`);
                resolve();
            } else {
                reject("say no more");
            }
            return;
        });
    }
}

Before wrapping our command, we create our QueueManager to manage our connections to RabbitMQ server and create channels.

const queueManager = new QueueManager({
    connection: "amqp://localhost",
    appId: "test",
});

There are 3 arguments we need to define before wrapping our command: channels, config & handlers. We already got channels from qManager. The config contains

Our command wrapper needs to get some arguments queueManager, commandName, retries, handlers.

The queue manager we've just created, name of the command will be the base for the command's queue & exchange names, have retries array - each element of the array represent a retry and the value is the retry delay (ms). And, the only missing piece are the handlers.

Let's go over the handlers:

  • command handler is our command - obviously.
  • validate handler runs before the command and check if command exec is valid.

    for example - a user changed his details and this triggered command to sync the details to 3rd party (HubSpot) but the sync failed on 1st time, so now it's on the retry loop. then the user changed his details once again. we don't want to override the 2nd update with the 1st update retry. another example - if trying to charge a Soji - make sure he is not charged already.

  • onError handler will run anytime the command will fail (after each retry).

  • onFailure handler will run after all retries failed and the message is dead.

    here we should configure a protocol to handle dead messages, for example configure DB table that saves all dead-messages then we can re-queue.

    const { command } = await CommandRetryWrapper.create({ queueManager, commandName: "say-hello", retries: 10000, 60000, // 10s & 1m handlers: { command: SaySomething.say, validate: async () => {}, onError: async (error, message, tryAttempt) => console.error("ERROR #", tryAttempt), onFailure: async (errors, message) => console.error(Failed after ${errors.length} errors), }, });

Thats it, we wrapped our command and now we can execute it. Earlier we configured our retries to 10s (10000ms) and 10m (600000ms). Let's exec our command, let it fail for the 1st try, and after 10s it should succeed. Thanks to the mouthClosed property we can control it. So, by default SaySomething.mouthClosed is true, this means our command will be rejected. Then, 10 seconds after it will run again for the 2nd try (the 1st retry) so we want it to be resolved this time:

const message = { sayWhat: `message ${Math.random()}`, sayTo: "Eli" };
await command(message);

setTimeout(async () => {
    SaySomething.mouthClosed = false;
}, 15000);

Before running our program we need to start a RabbitMQ server on our local machine. (we use docker for that.)

# start RabbitMQ server (localhost)
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# run index.ts
nodemon --exec ts-node

Enjoy :)


Next Steps

  • Start wrapping commands (from the candidates list below) with CommandRetryWrapper.
    • scheduler - scheduled tasks
    • disenroll from engagement/module
    • handle session is over
    • sync hubspot properties
    • send email
    • send notification

Once one of the commands above needs to re-queue it's messages:

  • create dead_queue_messages DB table & set a policy
  • create mechanism (api) to re-queue messages from dead_queue_messages

Next Next Steps

In the next steps, we will change our business-logic commands to **event-driven handlers.

We'll need to plan the architecture & create another implementation for that.