1.0.12 • Published 2 years ago

@syrf/transport-library v1.0.12

Weekly downloads
-
License
ISC
Repository
github
Last release
2 years ago

transport-library

NodeJs Message Queue client library, currently supports STOMP protocol



Stomp

STOMP is the Simple (or Streaming) Text Orientated Messaging Protocol.

STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms and brokers

More about STOMP protocol: https://stomp.github.io/

STOMP supported servers: https://stomp.github.io/implementations.html#STOMP_Servers

Basic Usage

const transport = require('../transport-library')

const stompClient = transport.Stomp.create(61613,'localhost','guest','guest')

stompClient.on(events.CONNECT, () => {
    // do something when connection established
})

stompClient.connect()

stompClient.subscribe('/topic/sample.topic',(message,header)=>{
    console.log(message.text)
})

stompClient.publish('/topic/sample.topic',{ text:"Hello STOMP!" })
//will output :
//Hello STOMP!

connect can have an object parameter that will be passed to the header of the CONNECT message frame

const transport = require('../transport-library')

const stompClient = transport.Stomp.create(61613,'localhost','guest','guest')

stompClient.on(events.CONNECT, () => {
    // do something when connection established
})

stompClient.connect({ "client-id": "123", otherHeader : "other header value" })

stompClient.subscribe('/topic/sample.topic',(message,header)=>{
    console.log(message.text)
})

stompClient.publish('/topic/sample.topic',{ text:"Hello STOMP!" })
//will output :
//Hello STOMP!

TLS

const tls = {
    ca: [fs.readFileSync('server/tls/localhost.key')]
}
const stompClient = transport.Stomp.create(61613,'localhost','guest','guest').useTLS(tls)

To use TLS connection, simply pass tls configuration when creating new instance of transport lib. the TLS options wil be passed to NodeJs TLS connect

Publish and Subscribe

Parsing

Messages passed in publish and received in subscribe will be parsed in JSON by default. In case you want to parse your own message, you can pass a custom function to parse message

const convertToXML = obj => {
    let result = ''
    //convert object to XML string
    return result
}

const convertFromXML = xml => {
    let result = null
    //convert XML to object
    return result
}
const stompClient = transport.Stomp.create(61613,'localhost','guest','guest')
stompClient.useCustomMessageFormat(convertToXML,convertFromXML)

messageFormatter will format the object you passed in publish second argument into string using the function you provided

messageParser will parse incoming message from message broker from string into object using function you provided

Batch Publish

Batch Publish can be used to reduce queue number by polling the outgoing messages from publish() method, into a batch that will be actually sent to MQ every time span period, or when the batch size is exceeding a configured limit.

publisher = Stomp.create(61613, 'localhost', 'guest', 'guest')
publisher.useBatchPublish({
    count: 10,
    size: 150000
    span: 1000
})

The sample code above shows how to use a batch publishing. the batch will be published every 1000ms or when there are 10 messages in batch or when the messages size is over 150KB

const processMessage = message => console.log(message)

stomp.subscribe('/topic/sample.topic', (data, headers) => {
    if(headers.isbatch === "true"){
        data.messages.map(processMessage)
    } else {
        processMessage(data)
    }
})

The example above shows how to handle a batch message on subscribe and detect if a message is a batch message or not.

Currently batch publish only supports JSON message format

Ack and Nack

To acknowledge a message the library provide ack and nack method

stompClient.subscribe('/topic/sample.topic',(message,header,subscriptionId)=>{
    //process the message

    stompClient.ack(header["message-id"],subscriptionId)
})

More about Ack and Nack

Connection Retry

When connection is broken from network issue or MQ server is down or other causes. library will try to reconnect with a configurable interval.

while waiting for the connection to re establish. outgoing message sent via publish method will be stored in backoff buffer, and will be sent immediately after connection established

any subscription will be re subscribed after connection is established.

Fixed Interval

const stompClient = transport.create(61613,'localhost','guest','guest')
stompClient.retryInterval(3000)

the configuration above sets the connection retry interval in 3s. So when connection is closed, the transport will retry to connect after 3s.

Incremental Interval

const stompClient = transport.create(61613,'localhost','guest','guest')
stompClient
    .retryInterval(3000)
    .incrementalRetryInterval(3000)

Exponential Interval

const stompClient = transport.create(61613,'localhost','guest','guest')
stompClient
    .retryInterval(3000)
    .exponentialRetryInterval()

Failover

let stompClient = Stomp.createWithFailover([
                        {
                            host: 'localhost',
                            port: 61613,
                            user: 'guest1',
                            pass: 'guest1'
                        },
                        {
                            host: 'localhost',
                            port: 61614,
                            user: 'guest2',
                            pass: 'guest2'
                        },
                        {
                            host: 'localhost',
                            port: 61615,
                            user: 'guest3',
                            pass: 'guest3',
                            tls : {
                                ca: [fs.readFileSync('server/tls/localhost.key')]
                            },
                        },
                        {
                            host: 'otherhost.com',
                            port: 61613,
                            user: 'guest4',
                            pass: 'guest4'
                        }
                    ])

When failover configuration is provided transport lib will use the first index first to connect to the server. when connection failure happens transport lib will immediately switch to the next failover server. If all server are tried and no connection is successful. then the retry interval will be used.

TLS configuration will also work with failover servers

Configurations

Available configuration options : Config | Type | Default | Description ---|---|---|--- port|number|N/A|define port of server host|string|N/A|define hostname of server user|string|N/A|username to login to server pass|string|N/A|password to login to server tls|object|null|to configure tls connection messageFormatter|function|body => JSON.stringify(body)|function to format message to be sent messageParser|function|body => JSON.parse(body)|function to parse message to received timeout|number|60000|configure socket timeout in ms batchPublish|object|null|configure batch publishing batchSubscribe|object|null|configure batch subscribing backoffBufferSize|number|1|count of messages to be stored when not connected failover|array|null|configure failover servers interval|number|3000|retry interval when connection lost increment|number|0|retry interval increment exponential|boolean|false|exponent the interval retry enableRetry|boolean|true|enable retry when connection lost useSeparateConnection|boolean|false|use separate socket connection between subscribe and publish statsInterval|5000|number|Interval between stats event bucketCount|60|number|Bucket count of stats data bucketSpan|1000|number|Bucket time span to store stats data name|string|default|name of the instance for stats monitoring group|string|default|group of the instance for stats monitoring

Builder Methods

  • static create : create new client object accepting port,host,user and password as parameters. returns stomp client object
  • static createWithFailover : create new client object with failover connection accepting failover arrays as parameters. returns stomp client object
  • useBatchPublish : combine multiple published messages with same destination into single STOMP message to reduce queue count. accept batch publish options object with these properties size batch size limit in bytes, count batch size limit in messages count and span batch timeout limit to send batches
  • useBatchSubscribe : same with useBatchPublish but for subscribe. will call message callback when exceed batch size or time
  • useTLS : use TLS connection instead of TCP. accepting NodeJs TLS options object
  • setConnectionTimeout : set connection idleness timeout in ms
  • retryInterval : set retry interval in ms
  • incrementalRetryInterval : set retry interval increment in ms
  • exponentialRetryInterval : set exponential retry interval
  • useBackoffBuffer : size of how many messages the back off buffer can store
  • useSeparateConnection : use separated connection between publish and subscribe
  • useCustomMessageFormat : use custom message format. default is JSON
  • useStatsMonitor : configure stats monitoring options (statsInterval, bucketCount and bucketSpan)
  • setName : set instance name to identify instance at stats monitoring

Methods

MethodArgument(s)ReturnsDescription
connectN/Avoidestablish connection
destroyN/Avoiddestroy socket and stomp client
subscribestring destination,object headers,function callbacksubscription Idsubscribe to queue, topic or exchange
unsubscribestring destinationvoidunsubscribe from a queue, topic or exchange
publishstring destination, any body, object headersvoidpublish message to a destination
ackobjectvoidacknowledge a message
nackobjectvoidsend NACK command
nackobjectvoidsend NACK command
disconnectfunction callbackvoidsend disconnect message to server and close socket

Events

List of events emitted :

  • connect : Event when socket connected and connected to stomp server
  • disconnect : Event when socket disconnected and disconnected to stomp server
  • error : Event on connection error happens
  • close : Event on connection closed
  • message : Event when message arrived
  • timeout : Event when idleness timeout passed
  • receipt : Event when a receipt is received
  • sendSocketclose : Event on send socket is closed when using separate connection
  • sendSocketConnect : Event on send socket is connected when using separate connection
  • subsSocketclose : Event on subscribe socket is closed when using separate connection
  • subsSocketConnect : Event on subscribe socket is connected when using separate connection
  • sendSocketTimeout : Event on subscribe socket is idle when using separate connection
  • stats : Emit stats data of the lib

Stats

Stats monitoring implemented circular buffer pattern to store latest sets of stats data. by default stats data is a summary of an array containing 60 buckets (called "bucket count"). the last bucket on the array will accumulate stats data for 1s (called "bucket span"). after bucket span elapsed, new bucket will be pushed to the array, and bucket on the first index will be removed. every 5s (stats interval), summary of the 60 buckets will be created and emitted by "stats" event along with instance data like name, group, host, port etc. here is the sample of stats data :

{
  name: 'default',
  group: 'default',
  port: 61613,
  host: 'localhost',
  user: 'guest',
  stats: {
    publish: { count: 10, avgSize: 360 },
    batchPublish: {
      count: 1,
      avgMessagesContained: 10,
      avgSize: 360,
      triggeredBy: {
        span: 1,
        size: 0,
        count: 0
      }
    },
    subscribe: { count: 10, avgTransportLatency: 0, avgSize: 365 }
  }
}

Development

Details about development and testing can be found in testing.md

1.0.12

2 years ago

1.0.9

2 years ago

1.0.8

2 years ago

1.0.11

2 years ago

1.0.10

2 years ago

1.0.7

3 years ago

1.0.6

3 years ago

1.0.5

3 years ago

1.0.4

3 years ago

1.0.3

3 years ago

1.0.2

3 years ago

1.0.1

3 years ago

1.0.0

3 years ago