1.3.1 • Published 8 years ago

kued v1.3.1

Weekly downloads
1
License
MIT
Repository
github
Last release
8 years ago

kued

Extensions for Kue (Daemonization, Checkpointing, etc.)

Build Status

Workers

Kued simplifies worker creation by organizing Job handling around classes:

const Worker = require('kued').Worker;

class MyWorker extends Worker {

  // This is where you can register handlers.
  init(){
    this.process('my-task', 'myTaskHandler');
    this.process('my-task-2', this.myTask2Handler.bind(this));
    // Retrieve up to 50 messages at a time (default is 1).
    this.process('my-task-3', 50, this.myTask3Handler.bind(this));
    this.process('my-task-4', (job, context, done) -> done());
  }

  myTaskHandler(job, context, done){
    done();
  }

  myTask2Handler(job, context, done){
    done();
  }

  myTask3Handler(job, context, done){
    done();
  }
}

The queue is available to Workers, so the outcome of one job can be another:

const Worker = require('kued').Worker;

class MyWorker extends Worker {

  // This is where you can register handlers.
  init(){
    this.process('my-task', 'myTaskHandler');
  }

  myTaskHandler(job, context, done){
    this.queue.create('new-task', { foo: 'bar' }).save(done);
  }
}

Time-based Intervals.

Sometimes you just need tasks executed at a regular interval. For this usecase, we have a special CronWorker:

const CronWorker = require('kued').CronWorker;

class Synchronizer extends CronWorker {

  constructor(config, logger, _queue){
    const cron = { "cronTime": "10 * * * * *", "runOnInit": true };
    super(cron, config, logger, _queue);
  }

  tick(){
    this.queue.create('new-task', { foo: 'bar' }).save(done);
  }
}

Checkpoints.

With distributed workers, sometimes it's important to not process a record for some reason, like if it's older than a last "modified" point. For this use case, Kued provides a "checkpointing" feature that applies a predicate test before processing a record.

Checkpoints wrap a typical subscription (i.e. use of Worker.process()):

const Worker = require('kued').Worker;
const moment = require('moment');

class CheckpointedWorker extends Worker {

  // This is where you can register handlers.
  init(){
    this.checkpoint()
        .topic('stock-quotes')
        .concurrency(1)
        .keyFactory((quote) => {
          // dynamically determining the checkpoint key
          // allows checkpoints to be scoped to practically
          // anything.
          return `quotes:${quote.symbol}`;
        })
        .iff((quote, checkpoint, callback) => {
          return moment(quote.datetime).valueOf() > moment(checkpoint.lastseen).valueOf();
        })
        .process('myTaskHandler');
  }

  myTaskHandler(job, context, done){
    const quote = job.data;
    this.queue.create('stock-update', quote).save((err) => {
      // Second argument is the new checkpoint value.
      done(err, { lastseen: quote.datetime });
    });
  }
}

Bridges.

Another common requirement of a worker is to receive messages from sources other than Kue. A bridge is a mechanism to take messages off of a provider (like IronMQ) and forward it to a Worker.

const BridgeFactory = require('kued').BridgeFactory;
const bridgeFactory = new BridgeFactory(config, logger);

// Specific options for the Bridge
const opts = {};

// This is not implemented yet.
const bridge = bridgeFactory.create('imq:queue', 'kue:task-worker-topic', opts);

TaskManager

TaskManager is simply a tiny wrapper around Kue used to build and submit Jobs. It's more of a convenience mechanism for managing the Kue connection outside of using a worker.

const moment = require('moment');
const TaskManager = require('kued').TaskManager;
const taskManager = new TaskManager(config, logger);


const stockQuote = { symbol: 'ABCD', value: 35.2, datetime: moment().toISOString() };

// Simplest form of enqueuing a task
taskManager.enqueue('stock-quotes', stockQuote,(err) => {
  if (err) console.error(err);
});

// Omit the Callback and get the Kue job (make further adjustments as necessary,
// but don't forget to call `save()` to submit the job.
taskManager.enqueue('stock-quotes', stockQuote).priority('high').attempts(2).save((err) => {
  if (err) console.error(err);
  // Close the connection when your done!
  taskManager.close();
});

Daemonizing

Worker config file: workers.json

{
  "providers": [
    {
      "provides": "kue",
      "connection": {
        "prefix": "myservice",
          "redis": {
          "port": 6379,
            "host": "localhost",
            "auth": "alright_alright_alright"
        }
      }
    },
    {
      "provides": "imq",
      "token": "abcde12345",
      "project_id": "asdfadfadsf"
    },
    {
      "provides": "checkpointer",
      "name": "redis-checkpointer",
      "require": "kued/lib/checkpointers/redis"
    }
  ],
  "workers": [
    {
      "name": "MyWorker",
      "require": "./lib/workers/myworker",
      "options": {
        "mongo": "mongodb://blah"
      }
    },
    {
      "name": "YourWorker",
      "require": "./lib/workers/yourworker",
      "options": {
        "db": "mysql://blah"
      }
    }
  ],
  "bridges": [
    {
      "name": "GoldenGate",
      "to": "imq:queue",
      "from": "kue:task-worker-topic",
      "options": {}
    }
  ],
  "workgroups": [
    {
      "workers": ["MyWorker", "YourWorker"],
      "instances": 3
    },
    {
      "workers": ["GoldenGate"],
      "instances": 1
    }
  ]
}

And to start that workers, just use the workers.js entrypoint:

# Your project
npm install
./node_modules/.bin/kued workers --config=workers.json

Evergreen Integration

Evergreen is side project of ours for making Node configuration easier and more powerful. We created a simple wrapper to allow a Workgroup to be spawned after importing an Evergreen compatible configuration file.

# Get the range of options by specifying the --help flag
everkued --help

# Example of spawning a Workgroup:
everkued --config config.json -workgroup ImageProcessor \
         --workers-path foo.bar.workers --modules trbl-evergreen-mongo

Keep in mind, this is used to spawn only one workgroup.

Task Enqueuing from Cli

# Your project
npm install
./node_modules/.bin/kued task --config=tasks.json taskname --params
1.3.1

8 years ago

1.3.0

8 years ago

1.2.4

8 years ago

1.2.3

8 years ago

1.2.2

8 years ago

1.2.1

8 years ago

1.2.0

8 years ago

1.1.0

8 years ago

1.0.1

8 years ago

1.0.0

8 years ago