@crabas0npm/enim-fugiat-labore v1.0.0
@crabas0npm/enim-fugiat-labore   
 
 
 
Shared file system queue for Node.js.
- Note: Version 12.1.0 supports direct producer-to-subscriber streams where the data doesn't go via the filesystem. See the 
direct_handleroption to theQlobberFSQ constructorand thedirectoption topublish. - Note: Version 9 can use shared memory LMAX Disruptors to speed things up on a single multi-core server.
 - Supports pub-sub and work queues.
 - Supports local file system for multi-core use.
 - Tested with FraunhoferFS (BeeGFS) and CephFS for distributed use.
 - Note: An alternative module, 
qlobber-pg, can be used when you need access from multiple hosts. It's API-compatible with@crabas0npm/enim-fugiat-laboreand requires a PostgreSQL database. - Highly configurable.
 - Full set of unit tests, including stress tests.
 - Use as a backend-less alternative to RabbitMQ, Redis pub-sub etc.
 - Supports AMQP-like topics with single- and multi-level wildcards.
 - Tested on Linux and Windows.
 
Example:
var QlobberFSQ = require('@crabas0npm/enim-fugiat-labore').QlobberFSQ;
var fsq = new QlobberFSQ({ fsq_dir: '/shared/fsq' });
fsq.subscribe('foo.*', function (data, info)
{
    console.log(info.topic, data.toString('utf8'));
    var assert = require('assert');
    assert.equal(info.topic, 'foo.bar');
    assert.equal(data, 'hello');
});
fsq.on('start', function ()
{
    this.publish('foo.bar', 'hello');
});You can publish messages using a separate process if you like:
var QlobberFSQ = require('@crabas0npm/enim-fugiat-labore').QlobberFSQ;
var fsq = new QlobberFSQ({ fsq_dir: '/shared/fsq' });
fsq.stop_watching();
fsq.on('stop', function ()
{
    this.publish('foo.bar', 'hello');
});Or use the streaming interface to read and write messages:
const { QlobberFSQ } = require('@crabas0npm/enim-fugiat-labore');
const fsq = new QlobberFSQ({ fsq_dir: '/shared/fsq' });
function handler(stream, info)
{
    const data = [];
    stream.on('readable', function ()
    {
        let chunk;
        while (chunk = this.read())
        {
            data.push(chunk);
        }
    });
    stream.on('end', function ()
    {
        const str = Buffer.concat(data).toString('utf8');
        console.log(info.topic, str);
        const assert = require('assert');
        assert.equal(info.topic, 'foo.bar');
        assert.equal(str, 'hello');
    });
}
handler.accept_stream = true;
fsq.subscribe('foo.*', handler);
fsq.on('start', function ()
{
    fsq.publish('foo.bar').end('hello');
});The API is described here.
Installation
npm install @crabas0npm/enim-fugiat-laboreLimitations
@crabas0npm/enim-fugiat-laboreprovides no guarantee that the order messages are given to subscribers is the same as the order in which the messages were written. If you want to maintain message order between readers and writers then you'll need to do it in your application (using ACKs, sliding windows etc). Alternatively, use theorder_by_expiryconstructor option to have messages delivered in order of the time they expire.@crabas0npm/enim-fugiat-laboredoes its best not to lose messages but in exceptional circumstances (e.g. process crash, file system corruption) messages may get dropped. You should design your application to be resilient against dropped messages.@crabas0npm/enim-fugiat-laboremakes no assurances about the security or privacy of messages in transit or at rest. It's up to your application to encrypt messages if required.@crabas0npm/enim-fugiat-laboresupports Node 6 onwards.
Distributed filesystems
Note: When using a distributed file system with @crabas0npm/enim-fugiat-labore, ensure that you synchronize the time and date on all the computers you're using.
FraunhoferFS (BeeGFS)
When using the FraunhoferFS distributed file system, set the following options in fhgfs-client.conf:
tuneFileCacheType             = none
tuneUseGlobalFileLocks        = true@crabas0npm/enim-fugiat-labore has been tested with FraunhoferFS 2014.01 on Ubuntu 14.04 and FraunhoferFS 2012.10 on Ubuntu 13.10.
CephFS
@crabas0npm/enim-fugiat-labore has been tested with CephFS 0.80 on Ubuntu 14.04. Note that you'll need to upgrade your kernel to at least 3.14.1 in order to get the fix for a bug in CephFS.
How it works
Under the directory you specify for fsq_dir, @crabas0npm/enim-fugiat-labore creates the following sub-directories:
stagingWhilst it's being published, each message is written to a file in the staging area. The filename itself contains the message's topic, when it expires, whether it should be read by one subscriber or many and a random sequence of characters to make it unique.messagesOnce published to the staging area, each message is moved into this directory.@crabas0npm/enim-fugiat-laboreactually creates a number of sub-directories (called buckets) undermessagesand distributes message between buckets according to the hash of their filenames. This helps to reduce the number of directory entries that have to be read when a single message is written.topicsIf a message's topic is long, a separate topic file is created for it in this directory.updateThis contains one file,UPDATE, which is updated with a random sequence of bytes (called a stamp) every time a message is moved into themessagesdirectory.UPDATEcontains a separate stamp for each bucket.
@crabas0npm/enim-fugiat-labore reads UPDATE at regular intervals to determine whether a new message has been written to a bucket. If it has then it processes each filename in the bucket's directory listing.
If the expiry time in the filename has passed then it deletes the message.
If the filename indicates the message can be read by many subscribers:
- If it's processed this filename before then stop processing this filename.
 - If the topic in the filename matches any subscribers then call each subscriber with the file's content. It uses 
qlobberto pattern match topics to subscribers. - Remember that we've processed the filename.
 
If the filename indicates the message can be read by only one subscriber (i.e. work queue semantics):
- Try to lock the file using 
flock. If it fails to lock the file then stop processing this filename. - If the topic in the filename matches any subscribers then call one subscriber with the file's content.
 - Truncate and delete the file before unlocking it. We truncate the file in case of directory caching.
 
Licence
Test
To run the default tests:
grunt test [--fsq-dir=<path>] [--getdents_size=<buffer size>] [--disruptor]If you don't specify --fsq-dir then the default will be used (a directory named fsq in the test directory).
If you specify --getdents_size then use of getdents will be included in the tests.
If you specify --disruptor then use of shared memory LMAX Disruptors will be included in the tests.
To run the stress tests (multiple queues in a single Node process):
grunt test-stress [--fsq-dir=<path>] [--disruptor]To run the multi-process tests (each process publishing and subscribing to different messages):
grunt test-multi [--fsq-dir=<path>] [--queues=<number of queues>] [--disruptor]If you omit --queues then one process will be created per core (detected with os.cpus()).
To run the distributed tests (one process per remote host, each one publishing and subscribing to different messages):
grunt test-multi --fsq-dir=<path> --remote=<host1> --remote=<host2>You can specify as many remote hosts as you like. The test uses cp-remote to run a module on each remote host. Make sure on each host:
- The 
@crabas0npm/enim-fugiat-laboremodule is installed at the same location. - Mount the same distributed file system on the directory you specify for 
--fsq-dir. FraunhoferFS and CephFS are the only distributed file systems currently supported. 
Please note the distributed tests don't run on Windows.
Lint
grunt lintCode Coverage
grunt coverage [--fsq-dir=<path>]c8 results are available here.
Coveralls page is here.
Benchmarks
To run the benchmark:
grunt bench [--fsq-dir=<path>] \
            --rounds=<number of rounds> \
            --size=<message size> \
            --ttl=<message time-to-live in seconds> \
            [--disruptor] \
            [--num_elements=<number of disruptor elements>] \
            [--element_size=<disruptor element size>] \
            [--bucket_stamp_size=<number of bytes to write to UPDATE file] \
            [--getdents_size=<buffer size>] \
            [--ephemeral] \
            [--refresh_ttl=<period between expiration check in seconds>] \
            (--queues=<number of queues> | \
             --remote=<host1> --remote=<host2> ...)If you don't specify --fsq-dir then the default will be used (a directory named fsq in the bench directory).
If you provide at least one --remote=<host> argument then the benchmark will be distributed across multiple hosts using cp-remote. Make sure on each host:
- The 
@crabas0npm/enim-fugiat-laboremodule is installed at the same location. - Mount the same distributed file system on the directory you specify for 
--fsq-dir. FraunhoferFS and CephFS are the only distributed file systems currently supported. 
API
Constructor
Publish and subscribe
Lifecycle
- QlobberFSQ.prototype.stop_watching
 - QlobberFSQ.prototype.refresh_now
 - QlobberFSQ.prototype.force_refresh
 - QlobberFSQ.get_num_buckets
 
Events
- QlobberFSQ.events.start
 - QlobberFSQ.events.stop
 - QlobberFSQ.events.error
 - QlobberFSQ.events.warning
 - QlobberFSQ.events.single_disabled
 - QlobberFSQ.events.getdents_disabled
 
QlobberFSQ(options)
Creates a new
QlobberFSQobject for publishing and subscribing to a file system queue.
Parameters:
{Object} [options]Configures the file system queue. Valid properties are listed below:{String} fsq_dirThe path to the file system queue directory. Note that the following sub-directories will be created under this directory if they don't exist:messages,staging,topicsandupdate. Defaults to a directory namedfsqin the@crabas0npm/enim-fugiat-laboremodule directory.{Boolean} encode_topicsWhether to hex-encode message topics. Because topic strings form part of message filenames, they're first hex-encoded. If you can ensure that your message topics contain only valid filename characters, set this tofalseto skip encoding. Defaults totrue.{Integer} split_topic_atMaximum number of characters in a short topic. Short topics are contained entirely in a message's filename. Long topics are split so the firstsplit_topic_atcharacters go in the filename and the rest are written to a separate file in thetopicssub-directory. Obviously long topics are less efficient. Defaults to 200, which is the maximum for most common file systems. Note: if yourfsq_diris on anecryptfsfile system then you should setsplit_topic_atto 100.{Integer} bucket_base,{Integer} bucket_num_charsMessages are distributed across different buckets for efficiency. Each bucket is a sub-directory of themessagesdirectory. The number of buckets is determined by thebucket_baseandbucket_num_charsoptions.bucket_baseis the radix to use for bucket names andbucket_num_charsis the number of digits in each name. For example,bucket_base: 26andbucket_num_chars: 4results in buckets0000throughpppp. Defaults tobase_base: 16andbucket_num_chars: 2(i.e. buckets00throughff). The number of buckets is available as thenum_bucketsproperty of theQlobberFSQobject.{Integer} bucket_stamp_sizeThe number of bytes to write to theUPDATEfile when a message is published. TheUPDATEfile (in theupdatedirectory) is used to determine whether any messages have been published without having to scan all the bucket directories. Each bucket has a section in theUPDATEfile,bucket_stamp_sizebytes long. When a message is written to a bucket, its section is filled with random bytes. Defaults to 32. If you set this to 0, theUPDATEfile won't be written to and all the bucket directories will be scanned even if no messages have been published.{Integer} flagsExtra flags to use when reading and writing files. You shouldn't need to use this option but if you do then it should be a bitwise-or of values in the (undocumented) Nodeconstantsmodule (e.g.constants.O_DIRECT | constants.O_SYNC). Defaults to 0.{Integer} unique_bytesNumber of random bytes to append to each message's filename (encoded in hex), in order to avoid name clashes. Defaults to 16. If you increase it (or change the algorithm to add some extra information like the hostname), be sure to reducesplit_topic_ataccordingly.{Integer} single_ttlDefault time-to-live (in milliseconds) for messages which should be read by at most one subscriber. This value is added to the current time and the resulting expiry time is put into the message's filename. After the expiry time, the message is ignored and deleted when convenient. Defaults to 1 hour.{Integer} multi_ttlDefault time-to-live (in milliseconds) for messages which can be read by many subscribers. This value is added to the current time and the resulting expiry time is put into the message's filename. After the expiry time, the message is ignored and deleted when convenient. Defaults to 1 minute.{Integer} poll_interval@crabas0npm/enim-fugiat-laborereads theUPDATEfile at regular intervals to check whether any messages have been written.poll_intervalis the time (in milliseconds) between each check. Defaults to 1 second.{Boolean} notifyWhether to usefs.watchto watch for changes to theUPDATEfile. Note that this will be done in addition to reading it everypoll_intervalmilliseconds becausefs.watch(inotifyunderneath) can be unreliable, especially under high load. Defaults totrue.{Integer} retry_intervalSome I/O operations can fail with an error indicating they should be retried.retry_intervalis the time (in milliseconds) to wait before retrying. Defaults to 1 second.{Integer} message_concurrencyThe number of messages in each bucket to process at once. Defaults to 1.{Integer} bucket_concurrencyThe number of buckets to process at once. Defaults to 1.{Integer} handler_concurrencyBy default, a message is considered handled by a subscriber only when all its data has been read. If you sethandler_concurrencyto non-zero, a message is considered handled as soon as a subscriber receives it. The next message will then be processed straight away. The value ofhandler_concurrencylimits the number of messages being handled by subscribers at any one time. Defaults to 0 (waits for all message data to be read).{Boolean} order_by_expiryPass messages to subscribers in order of their expiry time. Iftruethenbucket_baseandbucket_num_charsare forced to 1 so messages are written to a single bucket. Defaults tofalse.{Boolean} dedupWhether to ensure each handler function is called at most once when a message is received. Defaults totrue.{Boolean} singleWhether to process messages meant for at most one subscriber (across allQlobberFSQobjects), i.e. work queues. This relies on the optional dependencyfs-ext. Defaults totrueiffs-extis installed, otherwisefalse(in which case asingle_disabledevent will be emitted).{String} separatorThe character to use for separating words in message topics. Defaults to..{String} wildcard_oneThe character to use for matching exactly one word in a message topic to a subscriber. Defaults to*.{String} wildcard_someThe character to use for matching zero or more words in a message topic to a subscriber. Defaults to#.{Integer} getdents_sizeIf positive, usegetdentsto enumerate messages in bucket directories.getdents_sizeis the buffer size to use withgetdents. Otherwise, usefs.readdir(which is the default). Ifgetdentsis requested but unavailable, agetdents_disabledevent will be emitted.{Function (info, handlers, cb(err, ready, filtered_handlers)) | Array} filterFunction called before each message is processed.You can use this to filter the subscribed handler functions to be called for the message (by passing the filtered list as the third argument to
cb).If you want to ignore the message at this time then pass
falseas the second argument tocb.filterwill be called again later with the same message.Defaults to a function which calls
cb(null, true, handlers).handlersis an ES6 Set, or array ifoptions.dedupis falsey.filtered_handlersshould be an ES6 Set, or array ifoptions.dedupis falsey. If not,new Set(filtered_handlers)orArray.from(filtered_handlers)will be used to convert it.You can supply an array of filter functions - each will be called in turn with the
filtered_handlersfrom the previous one.An array containing the filter functions is also available as the
filtersproperty of theQlobberFSQobject and can be modified at any time.
{Function (bucket)} [get_disruptor]You can speed up message processing on a single multi-core server by using shared memory LMAX Disruptors. Message metadata and (if it fits) payload will be sent through the Disruptor.get_disruptorwill be called for each bucket number and should return the Disruptor to use for that bucket ornull. The same disruptor can be used for more than one bucket if you wish.{Integer} refresh_ttlIf you use a shared memory LMAX Disruptor for a bucket (seeget_disruptorabove), notification of new messages in the bucket is received through the Disruptor. However, checking for expired messages still needs to read the filesystem.refresh_ttlis the time (in milliseconds) between checking for expired messages when a Disruptor is in use. Defaults to 10 seconds.{Integer} disruptor_spin_intervalIf a Disruptor is shared across multiple buckets or multipleQlobberFSQinstances, contention can occur when publishing a message. In this casepublishwill try again until it succeeds.disruptor_spin_intervalis the time (in milliseconds) to wait before retrying. Defaults to 0.{Object} [direct_handler]Object with the following methods, used for transferring messages direct from publisher to subscribers without writing them to disk:{Function (filename, direct)} get_stream_for_publishCalled bypublishwhen truthyoptions.directis passed to it instead of writing data to disk. This method receives the name of the file to which data would have been written plus the value ofoptions.directthat was passed topublish. Whatever it returns will be returned by the call topublish.{Function (filename)} get_stream_for_subscribersCalled when a stream published by callingpublishwith truthyoptions.directneeds to be given to subscribers. It receives the name of the file to which data would have been written. It must return a Readable stream.{Function (filename, stream)} publish_stream_destroyedCalled when a stream returned byget_stream_for_publish()has been destroyed or the message has expired. It receives the name of the file passed to `get_stream_for_publish() and the destroyed stream.{Function (filename)} publish_stream_expiredCalled when a stream returned byget_stream_for_publish()has expired and should be destroyed. It receives the name of the file passed toget_stream_for_publish().{Function (filename, stream)} subscriber_stream_destroyedCalled when a stream returned byget_stream_for_subscribers()has been destroyed or the message has expired. It receives the name of the file passed toget_stream_for_subscribers()and the destroyed stream.{Function (filename)} subscriber_stream_ignoredCalled when a stream published by callingpublishwith truthyoptions.directdoesn't have any subscribers. It receives the name of the file to which data would have been written.
Go: TOC
QlobberFSQ.prototype.subscribe(topic, handler, options, cb)
Subscribe to messages in the file system queue.
Parameters:
{String} topicWhich messages you're interested in receiving. Message topics are split into words using.as the separator. You can use*to match exactly one word in a topic or#to match zero or more words. For example,foo.*would matchfoo.barwhereasfoo.#would matchfoo,foo.barandfoo.bar.wup. Note you can change the separator and wildcard characters by specifying theseparator,wildcard_oneandwildcard_someoptions when constructingQlobberFSQobjects. See theqlobberdocumentation for more information.{Function} handlerFunction to call when a new message is received on the file system queue and its topic matches againsttopic.handlerwill be passed the following arguments:{Readable|Buffer} dataReadable stream or message content as a Buffer. By default you'll receive the message content. Ifhandlerhas a propertyaccept_streamset to a truthy value then you'll receive a stream. Note that all subscribers will receive the same stream or content for each message. You should take this into account when reading from the stream. The stream can be piped into multiple Writable streams but bear in mind it will go at the rate of the slowest one.{Object} infoMetadata for the message, with the following properties:{String} fnameName of the file in which the message is stored.{String} pathFull path to the file in which the message is stored.{String} topicTopic the message was published with.{String} [topic_path]Full path to the file in which the topic overspill is stored (only present if the topic is too long to fit in the file name).{Integer} expiresWhen the message expires (number of milliseconds after 1 January 1970 00:00:00 UTC).{Boolean} singleWhether this message is being given to at most one subscriber (across allQlobberFSQobjects).{Integer} sizeMessage size in bytes.
{Function} doneFunction to call once you've handled the message. Note that calling this function is only mandatory ifinfo.single === true, in order to delete and unlock the file.donetakes two arguments:{Object} errIf an error occurred then pass details of the error, otherwise passnullorundefined.{Function} [finish]Optional function to call once the message has been deleted and unlocked, in the case ofinfo.single === true, or straight away otherwise. It will be passed the following argument:{Object} errIf an error occurred then details of the error, otherwisenull.
{Object} [options]Optional settings for this subscription:{Boolean} subscribe_to_existingIftruethenhandlerwill be called with any existing, unexpired messages that matchtopic, as well as new ones. Note thathandlerwill only receive new streams that were published by callingpublishwith truthyoptions.direct, not existing direct streams. Defaults tofalse(only new messages).
{Function} [cb]Optional function to call once the subscription has been registered. This will be passed the following argument:{Object} errIf an error occurred then details of the error, otherwisenull.
Go: TOC | QlobberFSQ.prototype
QlobberFSQ.prototype.unsubscribe(topic, handler, cb)
Unsubscribe from messages in the file system queue.
Parameters:
{String} [topic]Which messages you're no longer interested in receiving via thehandlerfunction. This should be a topic you've previously passed tosubscribe. If topic isundefinedthen all handlers for all topics are unsubscribed.{Function} [handler]The function you no longer want to be called with messages published to the topictopic. This should be a function you've previously passed tosubscribe. If you subscribedhandlerto a different topic then it will still be called for messages which match that topic. Ifhandlerisundefined, all handlers for the topictopicare unsubscribed.{Function} [cb]Optional function to call oncehandlerhas been unsubscribed fromtopic. This will be passed the following argument:{Object} errIf an error occurred then details of the error, otherwisenull.
Go: TOC | QlobberFSQ.prototype
QlobberFSQ.prototype.publish(topic, payload, options, cb)
Publish a message to the file system queue.
Parameters:
{String} topicMessage topic. The topic should be a series of words separated by.(or theseparatorcharacter you provided to theQlobberFSQ constructor). Topic words can contain any character, unless you setencode_topicstofalsein theQlobberFSQ constructor. In that case they can contain any valid filename character for your file system, although it's probably sensible to limit it to alphanumeric characters,-,_and..{String | Buffer} [payload]Message payload. If you don't pass a payload thenpublishwill return a Writable stream for you to write the payload into.{Object} [options]Optional settings for this publication:{Boolean} singleIftruethen the message will be given to at most one interested subscriber, across allQlobberFSQobjects scanning the file system queue. Otherwise all interested subscribers will receive the message (the default).{Integer} ttlTime-to-live (in milliseconds) for this message. If you don't specify anything thensingle_ttlormulti_ttl(provided to theQlobberFSQ constructor) will be used, depending on the value ofsingle. After the time-to-live for the message has passed, the message is ignored and deleted when convenient.{String} encodingIfpayloadis a string, the encoding to use when writing it out to the message file. Defaults toutf8.{Integer} modeThe file mode (permissions) to set on the message file. Defaults to octal0666(readable and writable to everyone).{Function} hasherA hash function to use for deciding into which bucket the message should be placed. The hash function should return aBufferat least 4 bytes long. It defaults to runningmd5on the message file name. If you supply ahasherfunction it will be passed the following arguments:{String} fnameMessage file name.{Integer} expiresWhen the message expires (number of milliseconds after 1 January 1970 00:00:00 UTC).{String} topicMessage topic.{String|Buffer} payloadMessage payload.{Object} optionsThe optional settings for this publication.
{Integer} bucketWhich bucket to write the message into, instead of usinghasherto calculate it.{Boolean} ephemeralThis applies only if a shared memory LMAX Disruptor is being used for the message's bucket (see theget_disruptoroption to theQlobberFSQ constructor). By default, the message is written both to the Disruptor and the filesystem. Ifephemeralis truthy, the message is written only to the Disruptor.If the Disruptor's elements aren't large enough to contain the message's metadata, the message won't be written to the Disruptor and
cb(below) will receive an error with a propertycodeequal to the stringbuffer-too-small.However, if the Disruptor's elements aren't large enough for the message's payload, the message will be written to disk. The amount of space available in the Disruptor for the payload can be found via the
ephemeral_sizeproperty on the stream returned by this function. If your message won't fit and you don't want to write it to disk, emit anerrorevent on the stream without ending it.
{Any} directDefaults tofalse. If truthy:- Writes an empty message to disk instead of any data.
 - Calls the 
direct_handler.get_stream_for_publish()method that was passed to theQlobberFSQ constructor. It will be passed the name of the file to which data would otherwise have been written and the value ofdirect. - Returns the value that 
direct_handler.get_stream_for_publish()returns. - Ignores 
single(i.e. publish to all interested subscribers). 
{Function} [cb]Optional function to call once the message has been written to the file system queue. This will be called after the message has been moved into its bucket and is therefore available to subscribers in anyQlobberFSQobject scanning the queue. It will be passed the following arguments:{Object} errIf an error occurred then details of the error, otherwisenull.{Object} infoMetadata for the message. Seesubscribefor a description ofinfo's properties.
Return:
{Stream | undefined} A Writable stream if no payload was passed, otherwise undefined.
Go: TOC | QlobberFSQ.prototype
QlobberFSQ.prototype.stop_watching(cb)
Stop scanning for new messages.
Parameters:
{Function} [cb]Optional function to call once scanning has stopped. Alternatively, you can listen for thestopevent.
Go: TOC | QlobberFSQ.prototype
QlobberFSQ.prototype.refresh_now()
Check the
UPDATEfile now rather than waiting for the next periodic check to occur
Go: TOC | QlobberFSQ.prototype
QlobberFSQ.prototype.force_refresh()
Scan for new messages in the
messagessub-directory without checking whether theUPDATEfile has changed.
Go: TOC | QlobberFSQ.prototype
QlobberFSQ.get_num_buckets(bucket_base, bucket_num_chars)
Given a radix to use for characters in bucket names and the number of digits in each name, return the number of buckets that can be represented.
Parameters:
{Integer} bucket_baseRadix for bucket name characters.{Integer} bucket_num_charsNumber of characters in bucket names.
Return:
{Integer} The number of buckets that can be represented.
Go: TOC | QlobberFSQ
QlobberFSQ.events.start()
startevent
QlobberFSQ objects fire a start event when they're ready to publish messages. Don't call publish until the start event is emitted or the message may be dropped. You can subscribe to messages before start is fired, however.
A start event won't be fired after a stop event.
Go: TOC | QlobberFSQ.events
QlobberFSQ.events.stop()
stopevent
QlobberFSQ objects fire a stop event after you call stop_watching and they've stopped scanning for new messages. Messages already read may still be being processed, however.
Go: TOC | QlobberFSQ.events
QlobberFSQ.events.error(err)
errorevent
QlobberFSQ objects fire an error event if an error occurs before start is emitted. The QlobberFSQ object is unable to continue at this point and is not scanning for new messages.
Parameters:
{Object} errThe error that occurred.
Go: TOC | QlobberFSQ.events
QlobberFSQ.events.warning(err)
warningevent
QlobberFSQ objects fire a warning event if an error occurs after start is emitted. The QlobberFSQ object will still be scanning for new messages after emitting a warning event.
Parameters:
{Object} errThe error that occurred.
Go: TOC | QlobberFSQ.events
QlobberFSQ.events.single_disabled(err)
single_disabledevent
QlobberFSQ objects fire a single_disabled event if they can't support work queue semantics.
Parameters:
{Object} errThe error that caused single-subscriber messages not to be supported.
Go: TOC | QlobberFSQ.events
QlobberFSQ.events.getdents_disabled(err)
getdents_disabledevent
QlobberFSQ objects fire a getdents_disabled event if they can't support enumerating bucket directories using getdents.
Parameters:
{Object} errThe error that causedgetdentsto be unavailable.
Go: TOC | QlobberFSQ.events
—generated by apidox—
2 years ago