0.1.8 • Published 9 months ago

@moleculer/channels v0.1.8

Weekly downloads
-
License
MIT
Repository
github
Last release
9 months ago

Moleculer logo

Integration Test Coverage Status Known Vulnerabilities NPM version

@moleculer/channels

Reliable messages for Moleculer services via external queue/channel/topic. Unlike moleculer built-in events, this is not a fire-and-forget solution. It's a persistent, durable and reliable message sending solution. The module uses an external message queue/streaming server that stores messages until they are successfully processed. It supports consumer groups, which means that you can run multiple instances of consumer services, incoming messages will be balanced between them.

Features

  • reliable messages with acknowledgement.
  • multiple adapters (Redis, RabbitMQ, NATS JetStream, Kafka).
  • plugable adapters.
  • configurable max-in-flight.
  • retry messages.
  • dead-letter topic function.
  • can receive messages from 3rd party services.
  • graceful stopping with active message tracking.

Install

npm i @moleculer/channels

Communication diagram

Native Communication Communication Overview diagram

Integration With A Third-Party System Third-Party

Usage

Register middleware in broker options

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    logger: true,

    middlewares: [
        ChannelsMiddleware({
            adapter: "redis://localhost:6379"
        })
    ]
};

By default, the middleware will add a sendToChannel(<topic-name>, { payload }) method and channelAdapter property to the broker instance. Moreover, it will register handlers located in channels of a service schema.

Consuming messages in Moleculer services

module.exports = {
    name: "payments",

    actions: {
        /*...*/
    },

    channels: {
        // Shorthand format
        // In this case the consumer group is the service full name
        async "order.created"(payload) {
            // Do something with the payload
            // You should throw error if you want to NACK the message processing.
        },

        "payment.processed": {
            // Using custom consumer-group
            group: "other",
            async handler(payload) {
                // Do something with the payload
                // You should throw error if you want to NACK the message processing.
            }
        }
    },

    methods: {
        /*...*/
    }
};

The received payload doesn't contain any Moleculer-specific data. It means you can use it to get messages from 3rd party topics/channels, as well.

Producing messages

broker.sendToChannel("order.created", {
    id: 1234,
    items: [
        /*...*/
    ]
});

The sent message doesn't contain any Moleculer-specific data. It means you can use it to produce messages to 3rd party topics/channels, as well.

Multiple adapters

Registering multiple adapters

const ChannelsMiddleware = require("@moleculer/channels").Middleware;

// moleculer.config.js
module.exports = {
    logger: true,
    logLevel: "error",
    middlewares: [
        // Default options
        ChannelsMiddleware({
            adapter: {
                type: "Kafka",
                options: {}
            }
        }),
        ChannelsMiddleware({
            adapter: "Redis",
            schemaProperty: "redisChannels",
            sendMethodName: "sendToRedisChannel",
            adapterPropertyName: "redisAdapter"
        }),
        ChannelsMiddleware({
            adapter: "AMQP",
            schemaProperty: "amqpChannels",
            sendMethodName: "sendToAMQPChannel",
            adapterPropertyName: "amqpAdapter"
        })
    ]
};

Using multiple adapters in a service

module.exports = {
    name: "payments",

    actions: {
        /*...*/
    },

    channels: {
        "default.options.topic": {
            group: "mygroup",
            async handler(payload) {
                /*...*/
            }
        }
    },
    redisChannels: {
        "redis.topic": {
            group: "mygroup",
            async handler(payload) {
                /*...*/
            }
        }
    },
    amqpChannels: {
        "amqp.topic": {
            group: "mygroup",
            async handler(payload) {
                /*...*/
            }
        }
    }
};

Middleware options

NameTypeDefault valueDescription
adapterString, ObjectnullAdapter definition. It can be a String as name of the adapter or a connection string or an adapter definition Object. More info
schemaPropertyString"channels"Name of the property in service schema.
sendMethodNameString"sendToChannel"Name of the method in ServiceBroker to send message to the channels.
adapterPropertyNameString"channelAdapter"Name of the property in ServiceBroker to access the Adapter instance directly.
contextbooleanfalseUsing Moleculer context in channel handlers by default.

Examples

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    logger: true,

    middlewares: [
        ChannelsMiddleware({
            adapter: "redis://localhost:6379",
            sendMethodName: "sendToChannel",
            adapterPropertyName: "channelAdapter",
            schemaProperty: "channels"
        })
    ]
};

Channel options

NameTypeSupported adaptersDescription
groupString*Group name. It's used as a consumer group in adapter. By default, it's the full name of service (with version)
maxInFlightNumberRedisMax number of messages under processing at the same time.
maxRetriesNumber*Maximum number of retries before sending the message to dead-letter-queue or drop.
deadLettering.enabledBoolean*Enable "Dead-lettering" feature.
deadLettering.queueNameString*Name of dead-letter queue.
contextboolean*Using Moleculer context in channel handlers.
tracingObject*Tracing options same as action tracing options. It works only with context: true.
handlerFunction(payload: any, rawMessage: any)*Channel handler function. It receives the payload at first parameter. The second parameter is a raw message which depends on the adapter.
redis.startIDStringRedisStarting point when consumers fetch data from the consumer group. By default equals to $, i.e., consumers will only see new elements arriving in the stream. More info here
redis.minIdleTimeNumberRedisTime (in milliseconds) after which pending messages are considered NACKed and should be claimed. Defaults to 1 hour.
redis.claimIntervalNumberRedisInterval (in milliseconds) between message claims
redis.readTimeoutIntervalNumberRedisMaximum time (in milliseconds) while waiting for new messages. By default equals to 0, i.e., never timeout. More info here
redis.processingAttemptsIntervalNumberRedisInterval (in milliseconds) between message transfer into FAILED_MESSAGES channel
amqp.queueOptionsObjectAMQPAMQP lib queue configuration. More info here.
amqp.exchangeOptionsObjectAMQPAMQP lib exchange configuration. More info here.
amqp.consumerOptionsObjectAMQPAMQP lib consume configuration. More info here.
nats.consumerOptionsObjectNATSNATS JetStream consumer configuration. More info here.
nats.streamConfigObjectNATSNATS JetStream storage configuration. More info here.
kafka.fromBeginningBooleanKafkaKafka consumer fromBeginning option. More info here.
kafka.partitionsConsumedConcurrentlyNumberKafkaKafka consumer partitionsConsumedConcurrently option. More info here.

Failed message

If the service is not able to process a message, it should throw an Error inside the handler function. In case of error and if maxRetries option is a positive number, the adapter will redeliver the message to one of all consumers. When the number of redelivering reaches the maxRetries, it will drop the message to avoid the 'retry-loop' effect. If the dead-lettering feature is enabled with deadLettering.enabled: true option then the adapter will move the message into the deadLettering.queueName queue/topic.

Dead-Letter Logic

Dead-Letter

Graceful stopping

The adapters track the messages that are being processed. This means that when a service or the broker is stopping the adapter will block the process and wait until all active messages are processed.

Publishing

Use the broker.sendToChannel(channelName, payload, opts) method to send a message to a channel. The payload should be a serializable data.

Method options

NameTypeSupported adaptersDescription
rawBoolean*If truthy, the payload won't be serialized.
persistentBooleanAMQPIf truthy, the message will survive broker restarts provided it’s in a queue that also survives restarts.
ttlNumberAMQPIf supplied, the message will be discarded from a queue once it’s been there longer than the given number of milliseconds.
priorityNumberAMQPPriority of the message.
correlationIdStringAMQPRequest identifier.
headersObjectAMQP, JetStream, Kafka, RedisApplication specific headers to be carried along with the message content.
routingKeyObjectAMQPThe AMQP publish method's second argument. If you want to send the message into an external queue instead of exchange, set the channelName to "" and set the queue name to routingKey
publishAssertExchange.enabledBooleanAMQPEnable/disable calling once channel.assertExchange() before first publishing in new exchange by sendToChannel()
publishAssertExchange.exchangeOptionsObjectAMQPAMQP lib exchange configuration when publishAssertExchange enabled
keyStringKafkaKey of Kafka message.
partitionStringKafkaPartition of Kafka message.
acksNumberKafkaControl the number of required acks.
timeoutNumberKafkaThe time to await a response in ms. Default: 30000
compressionanyKafkaCompression codec. Default: CompressionTypes.None
xaddMaxLenNumber or StringRedisDefine MAXLEN for XADD command

Middleware hooks

It is possible to wrap the handlers and the send method in Moleculer middleware. The module defines two hooks to cover it. The localChannel hook is similar to localAction but it wraps the channel handlers in service schema. The sendToChannel hook is similar to emit or broadcast but it wraps the broker.sendToChannel publisher method.

Example

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

const MyMiddleware = {
    name: "MyMiddleware",

    // Wrap the channel handlers
    localChannel(next, chan) {
        return async (msg, raw) => {
            this.logger.info(kleur.magenta(`  Before localChannel for '${chan.name}'`), msg);
            await next(msg, raw);
            this.logger.info(kleur.magenta(`  After localChannel for '${chan.name}'`), msg);
        };
    },

    // Wrap the `broker.sendToChannel` method
    sendToChannel(next) {
        return async (channelName, payload, opts) => {
            this.logger.info(kleur.yellow(`Before sendToChannel for '${channelName}'`), payload);
            await next(channelName, payload, opts);
            this.logger.info(kleur.yellow(`After sendToChannel for '${channelName}'`), payload);
        };
    }
};

module.exports = {
    logger: true,

    middlewares: [
        MyMiddleware,
        ChannelsMiddleware({
            adapter: "redis://localhost:6379"
        })
    ]
};

Context-based messages

In order to use Moleculer Context in handlers (transferring ctx.meta and tracing information) you should set the context: true option in channel definition object or in middleware options to enable it for all channel handlers.

Example to enable context for all handlers

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    logger: true,

    middlewares: [
        ChannelsMiddleware({
            adapter: "redis://localhost:6379",
            // Enable context in all channel handlers 
            context: true
        })
    ]
};

Using Context in handlers

module.exports = {
    name: "payments",

    actions: {
        /*...*/
    },

    channels: {
        "default.options.topic": {
            context: true, // Unless not enabled it globally
            async handler(ctx/*, raw*/) {
                // The `ctx` is a regular Moleculer Context
                if (ctx.meta.loggedInUser) {
                    // The `ctx.params` contains the original payload of the message
                    await ctx.call("some.action", ctx.params);
                }
            }
        }
    }
};

Send message with parent Context

In this case the ctx.meta and other tracing information is transferred to the channel handler.

module.exports = {
    name: "payments",

    actions: {
        submitOrder: {
            async handler(ctx) {
                await broker.sendToChannel("order.created", {
                    id: 1234,
                    items: [/*...*/]
                }, {
                    // Pass the `ctx` in options of `sendToChannel`
                    ctx
                });

            }
        }
    },
}

Tracing

To enable tracing for context-based handlers, you should register Tracing middleware in broker options.

The middleware works only with context: true.

Register channel tracing middleware

//moleculer.config.js
const TracingMiddleware = require("@moleculer/channels").Tracing;

module.exports = {
    logger: true,

    middlewares: [
        ChannelsMiddleware({
            adapter: "redis://localhost:6379",
            // Enable context in all channel handlers 
            context: true
        }),
        TracingMiddleware()
    ]
};

You can fine-tuning tracing tags and span name in tracing channel property similar to actions.

Customize tags and span name

broker.createService({
    name: "sub1",
    channels: {
        "my.topic": {
            context: true,
            tracing: {
                spanName: ctx => `My custom span: ${ctx.params.id}`
                tags: {
                    params: true,
                    meta: true
                }
            },
            async handler(ctx, raw) {
                // ...
            }
        }
    }
});

To disable tracing, set `tracing: false in channel definition.

Adapters

Adapter options

NameTypeDefault valueSupported adaptersDescription
consumerNameStringServiceBroker nodeID*Consumer name used by adapters. By default it's the nodeID of ServiceBroker.
prefixStringServiceBroker namespace*Prefix is used to separate topics between environments. By default, the prefix value is the namespace of the ServiceBroker.
serializerString, Object, SerializerJSON*Message serializer. You can use any built-in serializer of Moleculer or create a custom one.
maxRetriesNumber3*Maximum number of retries before sending the message to dead-letter-queue or drop.
maxInFlightNumber1*Max number of messages under processing at the same time.
deadLettering.enabledBooleanfalse*Enable "Dead-lettering" feature.
deadLettering.queueNameStringFAILED_MESSAGES*Name of dead-letter queue.
redisObject, String, NumbernullRedisRedis connection options. More info here
redis.consumerOptions .readTimeoutIntervalNumber0RedisMaximum time (in milliseconds) while waiting for new messages. By default equals to 0, i.e., never timeout. More info here
redis.consumerOptions .minIdleTimeNumber60 * 60 * 1000RedisTime (in milliseconds) after which pending messages are considered NACKed and should be claimed. Defaults to 1 hour.
redis.consumerOptions .claimIntervalNumber100RedisInterval (in milliseconds) between message claims.
redis.consumerOptions .startIDString$RedisStarting point when consumers fetch data from the consumer group. By default equals to $, i.e., consumers will only see new elements arriving in the stream. More info here.
redis.consumerOptions .processingAttemptsIntervalNumber0RedisInterval (in milliseconds) between message transfer into FAILED_MESSAGES channel.
redis.clusterObjectnullRedisRedis cluster connection options. More info here
redis.cluster.nodesArraynullRedisRedis Cluster nodes list.
redis.cluster.clusterOptionsObjectnullRedisRedis Cluster options.
amqp.urlStringnullAMQPConnection URI.
amqp.socketOptionsObjectnullAMQPAMQP lib socket configuration. More info here.
amqp.queueOptionsObjectnullAMQPAMQP lib queue configuration. More info here.
amqp.exchangeOptionsObjectnullAMQPAMQP lib exchange configuration. More info here.
amqp.messageOptionsObjectnullAMQPAMQP lib message configuration. More info here.
amqp.consumerOptionsObjectnullAMQPAMQP lib consume configuration. More info here.
amqp.publishAssertExchange.enabledBooleanfalseAMQPEnable/disable calling once channel.assertExchange() before first publishing in new exchange by sendToChannel(). More info here.
amqp.publishAssertExchange.exchangeOptionsObjectnullAMQPAMQP lib exchange configuration. More info here.
nats.streamConfigObjectnullNATSNATS JetStream storage configuration. More info here.
nats.consumerOptionsObjectnullNATSNATS JetStream consumer configuration. More info here.
kafka.brokersString[]nullKafkaKafka bootstrap brokers.
kafka.logCreatorFunctionnullKafkaKafka logCreator. More info here.
kafka.producerOptionsObjectnullKafkaKafka producer constructor configuration. More info here.
kafka.consumerOptionsObjectnullKafkaKafka consumer constructor configuration. More info here.

Redis Streams

Redis Streams was introduced in Redis 5.0. Hoverer, since this module relies on the XAUTOCLAIM command, Redis >= 6.2.0 is required.

To use this adapter, install the ioredis module with npm install ioredis command.

Redis Adapter Overview

Dead-Letter

Example

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    middlewares: [
        ChannelsMiddleware({
            adapter: "redis://localhost:6379"
        })
    ]
};

Example with options

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    middlewares: [
        ChannelsMiddleware({
            adapter: {
                type: "Redis",
                options: {
                    redis: {
                        // ioredis constructor options: https://github.com/luin/ioredis#connect-to-redis
                        host: "127.0.0.1",
                        port: 6379,
                        db: 3,
                        password: "pass1234",
                        consumerOptions: {
                            // Timeout interval (in milliseconds) while waiting for new messages. By default never timeout
                            readTimeoutInterval: 0,
                            // Time (in milliseconds) after which pending messages are considered NACKed and should be claimed. Defaults to 1 hour.
                            minIdleTime: 5000,
                            // Interval (in milliseconds) between two claims
                            claimInterval: 100,
                            // "$" is a special ID. Consumers fetching data from the consumer group will only see new elements arriving in the stream.
                            // More info: https://redis.io/commands/XGROUP
                            startID: "$",
                            // Interval (in milliseconds) between message transfer into FAILED_MESSAGES channel
                            processingAttemptsInterval: 1000
                        }
                    }
                }
            }
        })
    ]
};

You can overwrite the default values in the handler definition.

Overwrite default options in service

module.exports = {
    name: "payments",

    actions: {
        /*...*/
    },

    channels: {
        "order.created": {
            maxInFlight: 6,
            async handler(payload) {
                /*...*/
            }
        },
        "payment.processed": {
            redis: {
                minIdleTime: 10,
                claimInterval: 10
            }
            deadLettering: {
                enabled: true,
                queueName: "DEAD_LETTER"
            },
            async handler(payload) {
                /*...*/
            }
        }
    }
};

Redis Cluster

module.exports = {
    middlewares: [
        ChannelsMiddleware({
            adapter: {
                type: "Redis",
                options: {
                    redis: {
                        cluster: {
                            nodes: [
                                { port: 6380, host: "127.0.0.1" },
                                { port: 6381, host: "127.0.0.1" },
                                { port: 6382, host: "127.0.0.1" }
                            ],
                            options: {
                                /* More information: https://github.com/luin/ioredis#cluster */
                                redisOptions: {
                                    password: "fallback-password"
                                }
                            }
                        },
                        consumerOptions: {
                            // Timeout interval (in milliseconds) while waiting for new messages. By default never timeout
                            readTimeoutInterval: 0,
                            // Time (in milliseconds) after which pending messages are considered NACKed and should be claimed. Defaults to 1 hour.
                            minIdleTime: 5000,
                            // Interval (in milliseconds) between two claims
                            claimInterval: 100,
                            // "$" is a special ID. Consumers fetching data from the consumer group will only see new elements arriving in the stream.
                            // More info: https://redis.io/commands/XGROUP
                            startID: "$",
                            // Interval (in milliseconds) between message transfer into FAILED_MESSAGES channel
                            processingAttemptsInterval: 1000
                        }
                    }
                }
            }
        })
    ]
};

Capped Streams

To support Redis "capped streams", you can define the MAXLEN value in sendToChannel options as xaddMaxLen. It can be a number or a string with ~ prefix like ~1000. It will be transformed to ...MAXLEN ~ 1000 ...

Example

broker.sendToChannel("order.created", {
    id: 1234,
    items: [
        /*...*/
    ]
},{
    xaddMaxLen: "~1000"
});

AMQP (RabbitMQ)

The AMQP adapter uses the exchange-queue logic of RabbitMQ for creating consumer groups. It means the sendToChannel method sends the message to the exchange and not for a queue.

To use this adapter, install the amqplib module with npm install amqplib command.

Example

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    middlewares: [
        ChannelsMiddleware({
            adapter: "amqp://localhost:5672"
        })
    ]
};

Example with options

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    middlewares: [
        ChannelsMiddleware({
            adapter: {
                type: "AMQP",
                options: {
                    amqp: {
                        url: "amqp://localhost:5672",
                        // Options for `Amqplib.connect`
                        socketOptions: {},
                        // Options for `assertQueue()`
                        queueOptions: {},
                        // Options for `assertExchange()`
                        exchangeOptions: {},
                        // Options for `channel.publish()`
                        messageOptions: {},
                        // Options for `channel.consume()`
                        consumerOptions: {},
						// Note: options for `channel.assertExchange()` before first publishing in new exchange
						publishAssertExchange: {
							// Enable/disable calling once `channel.assertExchange()` before first publishing in new exchange by `sendToChannel`
							enabled: false,
							// Options for `channel.assertExchange()` before publishing by `sendToChannel`
							exchangeOptions: {}
						},
                    },
                    maxInFlight: 10,
                    maxRetries: 3,
                    deadLettering: {
                        enabled: false
                        //queueName: "DEAD_LETTER",
                        //exchangeName: "DEAD_LETTER"
                    }
                }
            }
        })
    ]
};

Example Producing messages with options

broker.sendToChannel("order.created", {
    id: 1234,
    items: [
        /*...*/
    ]
},{
    // Using specific `assertExchange()` options only for the current sending case
    publishAssertExchange:{
        // Enable/disable calling once `channel.assertExchange()` before first publishing in new exchange by `sendToChannel`
		enabled: true,
        // Options for `channel.assertExchange()` before publishing 
		exchangeOptions: {
			/*...*/
		}
	},
});

Note: If you know that the exchange will be created before sendToChannel is called by someone else, then it is better to skip publishAssertExchange option

Kafka

The Kafka adapter uses Apache Kafka topics.

In Kafka adapter, the maxInFlight function works differently than other adapters. Reading messages from a partition is processed sequentially in order. So if you want to process multiple messages, you should read messages from multiple partition. To enable it, use the kafka.partitionsConsumedConcurrently option in channel options. More info.

To use this adapter, install the kafkajs module with npm install kafkajs command.

Example

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    middlewares: [
        ChannelsMiddleware({
            adapter: "kafka://localhost:9092"
        })
    ]
};

Example with options

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    middlewares: [
        ChannelsMiddleware({
            adapter: {
                type: "Kafka",
                options: {
                    kafka: {
                        brokers: ["kafka-1:9092", "kafka-1:9092"],
                        // Options for `producer()`
                        producerOptions: {},
                        // Options for `consumer()`
                        consumerOptions: {}
                    },
                    maxRetries: 3,
                    deadLettering: {
                        enabled: false,
                        queueName: "DEAD_LETTER"
                    }
                }
            }
        })
    ]
};

NATS JetStream

To use this adapter, install the nats module with npm install nats command.

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    middlewares: [
        ChannelsMiddleware({
            adapter: "nats://localhost:4222"
        })
    ]
};

Example with options

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    middlewares: [
        ChannelsMiddleware({
            adapter: {
                type: "NATS",
                options: {
                    nats: {
                        url: "nats://localhost:4222",
                        /** @type {ConnectionOptions} */
                        connectionOptions: {},
                        /** @type {StreamConfig} More info: https://docs.nats.io/jetstream/concepts/streams */
                        streamConfig: {},
                        /** @type {ConsumerOpts} More info: https://docs.nats.io/jetstream/concepts/consumers */
                        consumerOptions: {
                            config: {
                                // More info: https://docs.nats.io/jetstream/concepts/consumers#deliverpolicy-optstartseq-optstarttime
                                deliver_policy: "new",
                                // More info: https://docs.nats.io/jetstream/concepts/consumers#ackpolicy
                                ack_policy: "explicit",
                                // More info: https://docs.nats.io/jetstream/concepts/consumers#maxackpending
                                max_ack_pending: 1
                            }
                        }
                    },
                    maxInFlight: 10,
                    maxRetries: 3,
                    deadLettering: {
                        enabled: false,
                        queueName: "DEAD_LETTER"
                    }
                }
            }
        })
    ]
};

Jetstream - Single stream with multiple topics

It is possible to configure single stream to handle multiple topics (e.g., streamOneTopic.abc and streamOneTopic.xyz). Moreover it possible for a single handler to receive any message that matches the filter streamOneTopic.*. Please check the example for all the details.

Fake adapter

This adapter is made for unit/integration tests. The adapter uses the built-in Moleculer event bus to send messages instead of an external module. It means that the message sending is not reliable but can be a good option to test the channel handlers in a test environment. The fake adapter is doesn't support retries and dead-letter topic features. For multiple brokers, you should define a transporter (at least the FakeTransporter)

Do NOT use this adapter in production!

Example

// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
    middlewares: [
        ChannelsMiddleware({
            adapter: "Fake"
        })
    ]
};

Benchmark

Tests are running on Intel i7 4770K, 32GB RAM on Windows 10 with WSL.

Tested adapters

NameAdapterDescription
RedisRedisSimple Redis Stream adapter.
RedisClusterRedisClustered Redis Stream adapter with 3 nodes.
NATS JetStreamNATSNATS JetStream adapter.
KafkaKafkaKafka adapter.

Latency test

In this test, we send one message at a time. After processing the current message, another one is sent. This test measures the latency of processing a message. The maxInFlight is 1.

AdapterTimemsg/sec
Redis2ms452
RedisCluster2ms433
AMQP51ms20
NATS JetStream1ms584
Kafka1ms637

chart

Throughput test (maxInFlight: 10)

In this test, we send 10k messages and wait for all be processed. This test measures the throughput. The maxInFlight is 10.

Adaptermsg/sec
Redis1294
RedisCluster4118
AMQP11143
NATS JetStream589
Kafka1831

chart

Throughput test (maxInFlight: 100)

In this test, we send 10k messages and wait for all be processed. This test measures the throughput. The maxInFlight is 100.

Adaptermsg/sec
Redis4081
RedisCluster4198
AMQP21438
NATS JetStream646
Kafka1916

chart

License

The project is available under the MIT license.

Contact

Copyright (c) 2023 MoleculerJS

@MoleculerJS @MoleculerJS