rxmsg v4.1.2
RxMsg
A powerfull and simple universal messaging abstraction
This library makes it easy to send messages in a distributed network transparent way via various brokers using RxJS streams.
RxMsg uses a versatile middleware pattern to create messaging endpoints that are extremely flexible.
Sending a message
const { createProducer } = require('rxmsg');
const { createAmqpConnector } = require('rxmsg/amqp');
const { amqpConfig } = require('./amqpConfig');
const middleware = createAmqpConnector(amqpConfig).sender();
const producer = createProducer(middleware);
// RxJS observer
producer.next({ body: 'Hello World!', to: 'hello' });
Receiving a message
const { createConsumer } = require('rxmsg');
const { createAmqpConnector } = require('rxmsg/amqp');
const { amqpConfig } = require('./amqpConfig');
const middleware = createAmqpConnector(amqpConfig).receiver({ noAck: true, queue: 'hello' });
const consumer = createConsumer(middleware);
// RxJS observable
consumer.subscribe(msg => {
console.log(`Received: "${msg.body}"`);
});
Configure your broker
module.exports.amqpConfig = {
declarations: {
// List the queues, exchanges etc. you want to use here.
queues: [
{
durable: false,
name: 'hello'
}
]
},
uri: 'amqp://user:password@somerabbitserver.io/user'
};
Using Middleware
The endpoint creators each accept a list of middleware as arguments. When the producer sends a message it passes top down through the list of middleware.
Producer middleware
Messages come into the system top to bottom. In this case from a producer.next(msg)
call.
const producer = createProducer(
transformMessageSomehow, // Step 1 - Do some transformation
broadCastsMessagesSomewhere // Step 2 - The last middleware must do the broadcasting
);
Consumer middleware
Again, messages come into the system top to bottom. Here this would be from an external broker via the top middleware.
const consumer = createConsumer(
receivesMessagesFromSomewhere, // Step 1 - The first middleware must emit the message.
logOrTransformMessage, // Step 2 - Perhaps send the message to a logger.
doSomeMoreTransformation // Step 3 - Run another transform on the message before subscription.
);
Creating your own Middleware
Middleware is simple as they are only functions designed to decorate RxJS streams. Here is their signature:
type Middleware<T> = (stream: Observable<T>) => Observable<T>;
Here is an example:
function logger(stream) {
return stream.pipe(
tap(
(msg) => console.log(`Stream logged: ${msg.body}`
)
);
}
You might use a middleware by passing it as one of the arguments to the createProducer()
or createConsumer()
functions.
const consumer = createConsumer(amqpReceiver, logger);
Manipulating messages
Note that because consumer is simply an RxJS observable you can apply filtering and throttling or do whatever you want to it
const sub = consumer
.pipe(filter(msg => msg.body.toLowerCase().includes('world')))
.subscribe(msg => {
console.log(`Received: ${msg.body}`);
});
Installation
You can install over npm.
yarn add rxmsg
npm install rxmsg --save
Getting Started Examples
You can checkout the getting started example here:
- RabbitMQ
- Kafka (coming soon)
- Node Processes (coming soon)
- Web Workers (coming soon)
- Socket.io (coming soon)
RabbitMQ Examples as tests
For usage and examples please look at the basic tests thrown together here
Usage with Typescript
Messages
Generic message objects look like this:
// Generic message
export interface IMessage {
body: any;
to: any;
correlationId?: string;
replyTo?: string;
}
You might use a message by sending it to the next()
method of a producer.
producer.next({
body: 'Hi there!',
to: 'some-queue'
});
Project Principles:
- Declarative over imperative.
- Functions over classes.
- Simplicity over complexity.
- Immutable over mutable.
- Flexible and composable over fixed heirarchy.
- Pure over impure.
- Minmalistic sensible defaults over boilerplate.
- Idiomatic API's over reinventing the wheel.
Environments
- Basic framework should work in all V8 environments. eg.
- Middleware is environment specific. Eg.
rxmsg/amqp
requires node.rxmsg/socketio-browser
(coming soon) requires a browser environment eg.window
,document
etc.
Broker Support
Currently we support the following brokers:
- AMQP / RabbitMQ
- Kafka
- Node Processes
- Web Workers
- Socket.io
Is there a message broker you would like to see on this list? Want to get a specific integration sooner?
Create an issue or talk to me about sponsoring this project.
Architectural Roadmap
- Refactor to lerna
RxJS References
Docs
Videos
NOTE: Using version 6
rxmsg
uses RxJS v6.0 so you need to pipe all your operators:
import { filter } from 'rxjs/operators';
// ...
consumer.pipe(filter(forUserEvents(userId))).subscribe(
msg => {
dealWithMessage(msg.body);
},
() => {}
);
Other References
- https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html
- https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html
- https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
- https://www.rabbitmq.com/tutorials/tutorial-four-javascript.html
- https://www.rabbitmq.com/tutorials/tutorial-five-javascript.html
- https://aws.amazon.com/blogs/compute/building-scalable-applications-and-microservices-adding-messaging-to-your-toolbox/