rbbt-client v0.5.1
RBBTClient
RBBTClient is a JavaScript library designed for seamless interaction with RabbitMQ over WebSockets. It offers a simple and intuitive API for connecting to RabbitMQ brokers, managing exchanges and subscribing to queues.
Installation
To install the rbbt-client package, use npm:
npm install rbbt-clientUsage
Here's a basic example of how to use RBBTClient:
import { RBBTClient } from "rbbt-client";
// Initialize the RBBTClient: Stomp URL, vhost, username, password
const rbbt = new RBBTClient("ws://localhost:15674/ws", "/", "guest", "guest");
// The default stomp port is 15674, please don't make the same mistakes I made that lead to this package
// Connect to the RabbitMQ broker
const conn = rbbt.connect();
// Create a new channel
const ex = conn.exchange("amq.direct");
// Create a new exclusive queue
const q = ex.queue("", { exclusive: true });
// Bind the queue with a routing key
q.bind("test");
// Subscribe to the queue
q.subscribe({ noAck: true }, (msg) => {
console.log(msg);
});API
RBBTClient
Constructor: new RBBTClient(url, vhost, username, password, name?)
url: The WebSocket URL of the RabbitMQ broker (e.g.,"ws://localhost:15674/ws").vhost: The virtual host to connect to (default:"/").username: The username for authentication (default:"guest").password: The password for authentication (default:"guest").name?: Optional name for the client instance.
Methods
connect()- Establishes a connection to the RabbitMQ broker using the provided credentials.
- Returns the
RBBTClientinstance for chaining further calls. - Example:
const conn = rbbt.connect();
close()- Closes the connection to the RabbitMQ broker.
- Example:
conn.close();
exchange(name, options)- Creates or retrieves an exchange by its
name. - Parameters:
name: The name of the exchange. If omitted, an automatically generated name is used.options: The exchange options (default:{}).
- Returns: An
RBBTExchangeobject. - Example:
const ex = conn.exchange("amq.direct");
- Creates or retrieves an exchange by its
debug(msg)- A function that can be overridden to log debug messages.
- Example:
conn.debug = (msg) => console.log(msg);
RBBTError
Constructor: new RBBTError(message, connection)
message: A string describing the error.connection: TheRBBTClientinstance that encountered the error.
RBBTError is a custom error class used throughout the RBBTClient library to handle client-specific errors.
Example:
try {
throw new RBBTError("Invalid connection", rbbt);
} catch (err) {
console.error(err.message); // "Invalid connection"
}RBBTExchange
Constructor: new RBBTExchange(connection, name, options?)
connection: TheRBBTClientinstance used for the connection.name: The name of the exchange. If not provided, a unique name will be generated (default:"").options?: Optional parameters to configure the exchange:passive: Whether the exchange is passive (default:false).durable: Whether the exchange is durable (default:false).autoDelete: Whether the exchange should auto-delete (default:false).internal: Whether the exchange is internal (default:false).
Properties
connection: The associatedRBBTClientinstance.name: The name of the exchange.watch: The watch subscription for the exchange.helper: An instance ofRBBTHelpers.queues: The list of queues associated with the exchange.closed: A boolean indicating whether the exchange is closed (default:false).options: The options used to configure the exchange.
Methods
open()- Opens the exchange by subscribing to it and watching for messages.
- Throws an error if the client is not connected.
- Example:
const exchange = new RBBTExchange(connection, "myExchange");
close()- Closes the exchange by unsubscribing from the watch and marking it as closed.
- Example:
exchange.close();
send(body, routingKey, properties?)- Sends a message to the exchange with the specified
body,routingKey, and optionalproperties. - Parameters:
body: The message body (can be astringorUint8Array).routingKey: The routing key for the message.properties: Additional message properties (default:{}).
- Example:
exchange.send("Hello, World!", "myRoutingKey");
- Sends a message to the exchange with the specified
subscribe(callback, { noAck, exclusive })- Subscribes to the exchange and receives messages, triggering the provided
callbackfunction. - Parameters:
callback: The function to handle the received messages (message: RBBTMessage).noAck: Whether to automatically acknowledge messages (default:false).exclusive: Whether the subscription is exclusive (default:false).
- Example:
exchange.subscribe((msg) => { console.log("Received message:", msg.body); });
- Subscribes to the exchange and receives messages, triggering the provided
unsubscribe()- Unsubscribes from the exchange and stops receiving messages.
- Example:
exchange.unsubscribe();
queue(queueName, options?)- Returns a queue associated with the exchange, creating it if necessary.
- Parameters:
queueName: The name of the queue (default:"").options: Queue options such aspassive,durable,autoDelete, andexclusive.
- Example:
const queue = exchange.queue("myQueue", { durable: true });
RBBTMessage
Constructor: new RBBTMessage(exchange)
exchange: TheRBBTExchangeobject associated with the message.
The RBBTMessage class represents a message sent or received from an exchange or queue. It contains properties and the message body.
Properties:
exchange: The exchange where the message was published.routingKey: The routing key used for message delivery (default:"").properties: A collection of message properties (headers, delivery mode, etc.).bodySize: The size of the message body (default:0).body: The message content, which can be aUint8Array,string, ornull.redelivered: A flag indicating if the message was redelivered (default:false).
Example:
const message = new RBBTMessage(exchange);
message.body = "Hello, world!";
console.log(message.body); // Output: "Hello, world!"RBBTQueue
Constructor: new RBBTQueue(exchange, name, options?)
exchange: TheRBBTExchangeinstance associated with the queue.name: The name of the queue. If not provided, a unique name will be generated (default:"").options?: Optional parameters to configure the queue:passive: Whether the queue should be passive (default:false).durable: Whether the queue should be durable (default:truefor non-empty names,falsefor empty names).autoDelete: Whether the queue should auto-delete when no consumers are connected (default:truefor empty names,falsefor non-empty names).exclusive: Whether the queue should be exclusive (default:truefor empty names,falsefor non-empty names).
Methods
create()- Creates the queue if the connection client is active and the exchange is not closed.
- Example:
const q = new RBBTQueue(ex, "myQueue");
bind(routingKey?)- Binds the queue to an exchange with the given
routingKey. If the queue is already bound, the binding is updated. - Parameters:
routingKey: The routing key for binding (default:"").
- Example:
q.bind("routing.key");
- Binds the queue to an exchange with the given
unbind(routingKey?)- Unbinds the queue from an exchange with the given
routingKey. - Parameters:
routingKey: The routing key for unbinding (default:"").
- Example:
q.unbind("routing.key");
- Unbinds the queue from an exchange with the given
subscribe(options, callback)- Subscribes to the queue for receiving messages. A callback is triggered on each new message.
- Parameters:
options: Subscription options:noAck: Whether to acknowledge messages automatically (default:true).exclusive: Whether the subscription is exclusive (default:false).tag: A custom tag for the subscription (default:"").args: Additional arguments for the subscription (default:{}).
callback: The function to handle the received messages (msg: RBBTMessage).
- Example:
q.subscribe({ noAck: false }, (msg) => { console.log(msg.body); });
unsubscribe()- Unsubscribes from the queue and stops receiving messages.
- Example:
q.unsubscribe();
send(body, properties?)- Sends a message to the queue with the specified
bodyandproperties. - Parameters:
body: The message body (can be astringorUint8Array).properties: Additional message properties (default:{}).
- Example:
q.send("Hello, World!", { priority: 1 });
- Sends a message to the queue with the specified
Types
RBBTQueueParams
Defines the parameters for queue configuration.
passive: (optional) Iftrue, the queue must already exist (default:false).durable: (optional) Iftrue, the queue will survive server restarts (default:trueifnameis provided, otherwisefalse).autoDelete: (optional) Iftrue, the queue will automatically delete itself when no longer in use (default:trueifnameis not provided).exclusive: (optional) Iftrue, the queue is exclusive to the connection (default:trueifnameis not provided).
type RBBTQueueParams = {
passive?: boolean;
durable?: boolean;
autoDelete?: boolean;
exclusive?: boolean;
};RBBTConsumeParams
Defines the parameters for consuming messages from a queue.
tag: (optional) A consumer tag to identify the consumer.noAck: (optional) Iftrue, messages are automatically acknowledged (default:true).exclusive: (optional) Iftrue, the consumer is exclusive (default:false).args: (optional) Additional arguments for the consumer.
export type RBBTConsumeParams = {
tag?: string;
noAck?: boolean;
exclusive?: boolean;
args?: Record<string, any>;
};RBBTProperties
Defines the message properties that can be set on a published message.
headers: (optional) Custom headers for the message, as key-value pairs.messageId: (optional) A unique identifier for the message.
export type RBBTProperties = {
headers?: Record<string, any>;
messageId?: string;
};RBBTExchangeParams
Defines the parameters for exchange configuration.
passive: (optional) Iftrue, the exchange must already exist (default:false).durable: (optional) Iftrue, the exchange will survive server restarts (default:false).autoDelete: (optional) Iftrue, the exchange will be deleted when no longer in use (default:false).internal: (optional) Iftrue, the exchange is used only for internal message routing (default:false).args: (optional) Additional arguments for the exchange, passed as key-value pairs.
export type RBBTExchangeParams = {
passive?: boolean;
durable?: boolean;
autoDelete?: boolean;
internal?: boolean;
args?: Record<string, any>;
};Example
Below is a full example demonstrating how to set up a connection, create a channel, declare a queue, bind it to an exchange, and start receiving messages:
import { RBBTClient } from "rbbt-client";
const rbbt = new RBBTClient("ws://localhost:15674/ws", "/", "guest", "guest");
// Step 2: Connect to RabbitMQ
const connection = rbbt.connect();
// Step 3: Create an exchange (this is where messages will be sent)
const exchange = connection.exchange("my.direct.exchange", {
durable: true, // The exchange will survive server restarts
});
// Step 4: Create a queue (this is where messages will be received)
const queue = exchange.queue("my.queue", {
durable: true, // The queue will survive server restarts
});
// Step 5: Bind the queue to the exchange using a routing key
queue.bind("my.routing.key");
// Step 6: Publish a message to the exchange
exchange.send("Hello RabbitMQ!", "my.routing.key");
// Step 7: Subscribe to the queue to receive messages
queue.subscribe(
{
noAck: false,
},
(message) => {
console.log("Received message:", message.body); // Display the message body in the console
},
);10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago