1.8.5 • Published 2 years ago

@avanzu/rhea-composable v1.8.5

Weekly downloads
-
License
ISC
Repository
github
Last release
2 years ago

rhea-composable

Allows bidirectional and unidirectional messaging using the AMQP 1.0 protocol. The heavy lifting regarding the messaging protocol is handled courtesy of rhea.

Usage

There are two scenarios that are available: unidirecitonal and bidirectional messaging.

In both cases, you have two independent system components that are unaware of each other. They only share the knowledge of the queue to communicate over and the format of the message(s).

connecting to the broker

Both systems will have to establish a connection to the message broker.

// src/connection.js
const { useConnection } = require('@avanzu/rhea-composable')

const connection = useConnection()
    .connectionOf(
        'my-connection-id', {
            host: process.env.AMQP_HOST, 
            port: process.env.AMQP_PORT,
            username: process.env.AMQP_USER,
            password: process.env.AMQP_PASSWD,
            transport: process.env.AMQP_TRANSPORT,
    })

module.exports = connection

This will open a connection to your amqp broker. For a full list of connection options, please refer to the rhea documentation.

Attention

typically you can only have exactly one broker connection per IP. Trying to open more than that will most likely cause an exception.

unidirectional

Drop a message into a queue to be processed without expecting a reply.

A corresponding example can be executed via npm run examples:uni

In this scenario we will have a worker and a dispatcher.

worker setup

The worker will use the established connection in order to receive and process messages using a map of message handlers.

// src/worker.js
const { useProcessor } = require('@avanzu/rhea-composable')
const connection = require('./connection')


// declare a map of message handlers 
const handlers = {
    mySubject: ({ message }) => {
        console.log('"mySubject" handler received message %o', message)
    },
    default: ({ message }) => {
        console.log('default handler received message %o', message)
    },
}

// attach handlers to the queue
useProcessor(connection).processMessages(
    process.env.WORKER_QUEUE_NAME, 
    handlers
)

dispatcher setup

The dispatcher will use the established connection in order to add messages to the queue that the worker is processing.

// src/dispatcher.js

const { useSender } = require('@avanzu/rhea-composable')
const connection = require('./connection')

const sender = useSender().openSender(
    connection, 
    process.env.WORKER_QUEUE_NAME
)

// triggering the subject based handler 
sender.send({ subject: 'mySubject', body: { foo: 'bar' } })

// triggering the default handler
sender.send({ subject: '', body: { bar: 'baz' } })

bidirectional

Drop a message into a queue to be processed and expect a reply.

A corresponding example can be executed via npm run examples:bi

In this scenario we will have a responder and a requestor.

responder setup

The responder setup is actually quite similar to the worker setup. The main difference is, that the handlers are supposed to return a reply message.

// src/responder.js
const { useProcessor } = = require('@avanzu/rhea-composable')
const connection = require('./connection')

// declare a map of message handlers 
const handlers = {
    // subject based message handling
    mySubject: ({ message }) => {
        console.log('Received request for "mySubject" %o', message)
        return {
            subject: 'mySubjectReply',
            body: { received: message },
        }
    },
    default: ({ message }) =>
        new Promise((Ok) => {
            console.log('Received request on default handler %o', message)
            Ok({ body: { received: message } })
        }),
}

// attach handlers to the queue
useProcessor(connection).processMessages(
    process.env.DIALOG_QUEUE_NAME, 
    handlers
)

requestor setup

The requestor works similar to the dispatcher. Since you do expect a reply, the send method does return a promise that you can await.

You can assign each individual message with a ttl in milliseconds. Not doing so will assign a ttl of 5000 ms.

// src/requestor.js

const { useDialog } = = require('@avanzu/rhea-composable')
const connection = require('./connection')

const sender = useDialog(connection)
    .openDialog(process.env.DIALOG_QUEUE_NAME)


// triggering the subject based handler 
sender
    .send({ subject: 'mySubject',  body: { foo: 'bar' } })
    .then(
        (reply) => { console.log('Received reply %o', reply) },
        (error) => { console.error('Received error %o', error)}
    )


// triggering the default handler
sender
    .send({ ttl: 10000, body: { bar: 'baz' }  })
    .then(
        (reply) => { console.log('Received reply %o', reply) },
        (error) => { console.error('Received error %o', error)}
    )  

messaging strategies

In terms of organizing different message types, there are two ways of dealing with them.

Single message type queues

...will guarantee that there will only ever be one specific type of message on a specific queue.

This strategy is advisable when you expect a high volume of time critical messages that should be processed in parallel. You will end up with far more queues in the broker but every queue processor has its own receiver so they will be processed in parallel.

Subject based messages over a single queue

...will use a single queue to deliver multiple message types which are distinguished by their subject.

This strategy is advisable when you expect a few messages occasionally. You will end up with fewer queues in the broker but processing might take longer when experiencing higher load.

1.8.2

2 years ago

1.8.5

2 years ago

1.8.4

2 years ago

1.8.1

2 years ago

1.8.0

2 years ago

1.7.0

2 years ago

1.6.0

2 years ago

1.5.1

2 years ago

1.5.0

2 years ago

1.4.2

2 years ago

1.4.1

2 years ago

1.4.0

2 years ago

1.3.0

2 years ago

1.2.0

2 years ago

1.1.0

2 years ago