0.3.17 • Published 3 years ago

@colu-legacy/osseus-mq v0.3.17

Weekly downloads
-
License
MIT
Repository
github
Last release
3 years ago

JavaScript Style Guide

Osseus MQ

AmazonMQ / ActiveMQ based osseus queue, topic and bus module

  • WARNING current version 0.1.0 is in alpha stage.

Install

$ npm install osseus-mq

Usage

Configuration

See osseus-config for the configuration details.

  • OSSEUS_MQ_PROTOCOL (string, required) - 'AMQP' for AMQP or 'STOMP' for STOMP
  • OSSEUS_MQ_AMQP_BROKERS (array, required) - array of brokers
  • OSSEUS_MQ_AMQP_CONSUMERS_<broker alias> (array, required) - array of consumers of a certain broker
  • OSSEUS_MQ_AMQP_PRODUCERS_<broker alias> (array, required) - array of producers to a certain broker

  • Example:

	OSSEUS_MQ_AMQP_BROKERS: [
            {
              'alias': 'cluster',
              'connect_options': {
                'username': 'admin',
                'password': 'admin',
                'transport': 'ssl', //ONLY for amqps, remove otherwise
                'connections': [
                  {'host': 'localhost', 'port': 5672},
                  {'host': 'localhost', 'port': 5673}
                ]
              }
            }
        ],
        OSSEUS_MQ_AMQP_CONSUMERS_CLUSTER: [
            {
              name: 'queue://SYSTEM.MESSAGES',
              alias: 'smsg',
              options: ['settle'] // settle option for queue:// will redlive untill message is accepted by a client.
            }
        ],
        OSSEUS_MQ_AMQP_PRODUCERS_CLUSTER: [
            {
              name: 'queue://SYSTEM.MESSAGES',
              alias: 'smsg',
              options: ['settle'] // settle option for queue:// will redlive untill message is accepted by a client.
            }
        ]
Credit

To limit the number of messages handled at once use credit. With credit only a limited number of messages will be handled and the rest will wait at the broker. Upon each accepted/rejected message a new one would dequeue.

  • OSSEUS_MQ_HANDLE_CREDIT (boolean) - Wheather to handle credit manually or not.
  • OSSEUS_MQ_AMQP_INITIAL_CREDIT (number) - How much credit to start with. If set to 0 no message will be dequeue. Only relevant when OSSEUS_MQ_HANLDE_CREDIT is set to true.
  • OSSEUS_MQ_AMQP_CREDIT_LIMIT (number) - Max credit the queue can reach. Only relevant when OSSEUS_MQ_HANLDE_CREDIT is set to true.

These parameters would be set for all queues but can be overriden for each one differently.

  • Example:
	
        OSSEUS_MQ_AMQP_CONSUMERS_CLUSTER: [
            {
              name: 'queue://SYSTEM.MESSAGES',
              alias: 'smsg',
              options: ['settle'], // settle option for queue:// will redlive untill message is accepted by a client.
              credit: {
        		handle: true,
        		initial: 1,
        		limit: 1
      		  }
              }
        ],
        ...
  • OSSEUS_MQ_DB_USAGE (string, required) - what database type to use if send with persist flag set. Options are: MONGODB POSTGRES * NONE
  • OSSEUS_MQ_DB_CONNECTION_STRING (string, optional) - connection string for the database that will persist messages. * Required if OSSEUS_MQ_DB_USAGE != "NONE"
  • APPLICATION_NAME (string, required) - name of the app to use in logging and messaging.

note: wait for osseus.mq to emit ready event before usage

Protocols

AMQP

AMQP is a specification (not a product) that was created to enable interoperability between multiple integration broker implementations. AMQP is a protocol and a fairly complete specification for the most commonly used middleware functionality.

Topic / Queue names {#topicnames}

Topics should be used when a message sould have multiple receivers, order is not an issue and there is no single action that is "atomic" that needs to be taken by the consumers.

Queue should be used when only a single consumer should consume the message and report when it is finished with taking action on that message or return the message to the Queue for another consumer.

Queue can have exclusive mode and both of them can have durable and save historical messages.

In order to use a queue topic name should start with queue:// and to use a topic with topic://, both support wildcards '*' and both support complete will end '>', so topic://A.*.C will match with wildcard whereas topic://A.> will match everything that starts with A.

Methods

All methods are accessable via the mq object which will be added to the base osseus object.

The mq object is an EventEmitter.

send (alias, message, options) ⇒ Promise {#send}

Method to send data to the named queue or topic, promise will be resolved once message is sent (and persisted if specified), promise will contain the base message object which will have a requestId if provided, and the system generated messageId

Kind: function

ParamTypeDescription
aliasstringstring alias for the topic / queue
messageobjectJSON to pass as the message (will be available in <object>.msg)
optionsobjectoptions object
options.persist'async'/'sync'persist message synchronously or asynchronously, if not set message not persisted
options.requestIdstringrequest id to add to the base message obejct
options.topicstringset topic field on message for diffrent message types

receive (alias, options, callback) ⇒ no return value {#receive}

Method to start getting notifications for incomming messages on the queue / topic. seta up a callback for the queue / topic, the callback is optional and user can instead use the .on('message',(alias, message, done, failed) method of the object to get the emitted events, though receive still needs to be called to set up a listener for the specific queue. queue must call done once it is done proccessing the message, calling failed(string - error) instead will set the message to error, not calling anything will redelive the messgae to this or any other consumer after the broker sees this instance has dissconnected.

Kind: function

ParamTypeDescription
aliasstringstring alias for the topic / queue
optionsobjectoptions object
callback (optional)function(alais, message, done, failed)called when a message arrives to the queue/topic

Statistics

osseus-mq allows getting statistics from queues. Getting statistics is done by sending a message to the broker and waiting for it to respond. The number of messages in the response is as the number of queues requested. Since there is no way of telling when the broker is finished answering, gathering the statistics is divided into two parts. The first one is for sending the request to the broker to get statistics for a destination and the second one is to get the last updated statistics received from the broker for the same destination.

sendDestinationStatisticsRequest (destination) ⇒ no return value

ParamTypeDescription
destinationstringqueue

retrieveStatisticsResponse (destination) ⇒ no return value

ParamTypeDescription
destinationstringqueue

It's best to wait for a few seconds between the steps to give the broker time to respond.

You can also use wildcards in the destination. Examples:

TEST.FOO - Get statistics for TEST.FOO

TEST.> - Get statistics for all queues start with TEST

> - Get statistics for all queues.

Known Issues:

consumer:

  • If setting a retain mode (ActiveMQ retain) then messages are expired from a topic based on ttl as well as retain method.

producer:

  • Topics do not receive expire (adviosry messages) even if ttl expires most of the time, even if there was no consumer or a consumer, should maybe check with delivery count.
  • Queues are automatically retaining all messages based on ttl, while topics fire and forget
  • Settle on disposition don't work, broker sends ack frame always when the message is stored in the broker (both for queues and topics)

Advisory messages:

NoConsumer:
Topic :
  • Works. Message is sent only if publisher connects to a topic and there are no consumers at that time when a consumer goes up or a consumer goes down. Consumer advisory message is sent with applicationProperties: { consumerCount: int }
Queue :
  • Message is never sent.
Consumer:
Topic :
  • Works. Message is sent when a consumer count changes applicationProperties: { consumerCount: int }, not if there are 0 consumers when producer starts
Queue :
  • Works. Message is sent when a consumer count changes applicationProperties: { consumerCount: int }, not if there are 0 consumers when producer starts

License

Code released under the MIT License.