1.1.0 • Published 9 months ago

fsqu v1.1.0

Weekly downloads
-
License
MIT
Repository
-
Last release
9 months ago

FSqu: job queues on top of filesystem

This is a small library to implement job queues on top of a regular filesystem, where a queue is a directory and each element is a file

The main goal is to allow building job pipelines where the job units are arbitrarily large files, but all the processing happens in the same machine. For example, a pipeline to process audio files

Features

  • supports many simultaneous consumers and producers (as long as all of them use the same root FSqu object and therefore live in the same process)
  • constant-time operations: queue operations do not degrade as the queue grows, so queues with many thounsands of elements can me easily managed
  • delay/schedule: entries can be pushed with a not before timestamp, and won't be elligible for a pop() until then this does not hamper performance, nor it does affect the ret of elements
  • supports at-most-once (pop()) and at-east-once (reserve-commit-rollback) models
  • supports direct addition of files (as in mv fff.mp3 queue_dir/)
  • queues survive process: files left in a queue will be re-taken after queue re-creation
  • There are no FIFO guarantees: insertion order is mostly kept, but not guaranteed
  • uses debug for debugging (DEBUG=fsqu:* )

Planned features

  • associated deadletter queues

Quick Start

const FSqu =  require ('fsqu');
const async = require ('async');
const fs =    require ('fs');

const fsqu = new FSqu.fsqu ({
  name:    'a-test-queue',
  path:    '/tmp/fsqu/test-queues-with-fsqu',
  landing: '/tmp/fsqu/landing'
});

async.series ([
  cb => fsqu.init (cb),
  cb => fsqu.pushData ('contents of file #1', cb),
  cb => fsqu.pushData ('contents of file #2', cb),
  cb => fsqu.pop(cb),
  cb => fsqu.pop(cb),
], (err, res) => {
  if (err) console.error (err);

  console.log(fs.readFileSync (res[3].loc, { encoding: 'utf8' })); // should yield 'contents of file #1'
  console.log(fs.readFileSync (res[4].loc, { encoding: 'utf8' })); // should yield 'contents of file #2'

  fsqu.end(err => console.log ('all done', err));
});

More examples can be found here

API

Queue lifecycle

  • const q = new FSqu.fsqu(opts): queue creation. Creates a queue from a directory. The directory is created if nonexistent

    opts can be:

    • path: directory where the queue will be created
    • name: name of the directory hosting the queue. The full queue directory will therefore be <path>/<name>
    • landing: directory where to move entries after a call to pop(). It will be created if nonexistent
  • q.init(cb): Initializes a queue. This must be called before it can be used

  • q.end(cb): terminates a queue. It cancels any pending consumer

Insert-into-queue operations

  • q.pushData (data, opts, cb): pushes raw data into the queue.

    • data: can be a string or a Buffer and will be written in a file that will be added to the queue
    • opts can contain:
      • ts: a Date to be used as not before timestamp for the entry. Defaults to now. The entry won't be elligible for pop() until this time has come
      • delay: milliseconds to add to the not-before timestamp. Could be negative
  • q.pushStream (stream, opts, cb): pushes data from a stream into the queue.

    • stream: stream, will be piped into a file (and therefore closed) that will be added to the queue
    • opts can contain:
      • ts: a Date to be used as not before timestamp for the entry. Defaults to now. The entry won't be elligible for pop() until this time has come
      • delay: milliseconds to add to the not-before timestamp. Could be negative
  • q.pushFile (filename, opts, cb): pushes raw data into the queue.

    • filemane: existing file, it will be moved, not copied into the queue
    • opts can contain:
      • ts: a Date to be used as not before timestamp for the entry. Defaults to now. The entry won't be elligible for pop() until this time has come
      • delay: milliseconds to add to the not-before timestamp. Could be negative

Get-from-queue operations

  • q.pop (cb): get a file from the queue. It will not return until a file is available (that is, present and with a not-before in the past)

    cb will then be called with the usual cb(res, err) form, where res.loc will be the file path, already extracted from the queue and left in the landing directory. After this, the file is not under the queue control, and it will the caller's responsibility to remove it when not needed anymore after treatment

  • q.reserve (opts, cb): reserves a file from the queue. It will not return until a file is available (that is, present and with a not-before in the past)

    cb will then be called with the usual cb(res, err) form, where res.loc will be the file path, still under queue control. Caller should not do anything with it besides reading

    opts can contain:

    • delta: time in milliseconds the element will remain reserved. After that, it will be available for pop() or reserve() again
  • q.commit (id|reserve-res, cb): commits a previously-reserved file. First parameter can be the element id (res.id in the reserve() call) or directly the whole res returned by reserve(). After this call, the file will be removed from the queue and deleted from disk

Direct add/remove of files

  • Files can be added to a queue by directly moving them (as in mv) into the queue's directory. This operation is equivalent to a q.pushFile() with no options. For this to work the file name can't match the format used internally (^([0-9]+)-([0-9a-z]{16})-([0-9]+)$)

    It is important to move them and not copying them: most filesystems can't reliably tell when a file that's opened for writing is closed (or worse: depending on who's producing the file you may see a series of open-append-close operations) so it is not possible to determine when a file can be acted upon

  • However, files can NOT be safely removed (as in rm) or moved away (as in mv) from the queue's directory

1.1.0

9 months ago

1.0.0

10 months ago

0.0.0

10 months ago