rbbt-client v0.5.1
RBBTClient
RBBTClient
is a JavaScript library designed for seamless interaction with RabbitMQ over WebSockets. It offers a simple and intuitive API for connecting to RabbitMQ brokers, managing exchanges and subscribing to queues.
Installation
To install the rbbt-client
package, use npm:
npm install rbbt-client
Usage
Here's a basic example of how to use RBBTClient
:
import { RBBTClient } from "rbbt-client";
// Initialize the RBBTClient: Stomp URL, vhost, username, password
const rbbt = new RBBTClient("ws://localhost:15674/ws", "/", "guest", "guest");
// The default stomp port is 15674, please don't make the same mistakes I made that lead to this package
// Connect to the RabbitMQ broker
const conn = rbbt.connect();
// Create a new channel
const ex = conn.exchange("amq.direct");
// Create a new exclusive queue
const q = ex.queue("", { exclusive: true });
// Bind the queue with a routing key
q.bind("test");
// Subscribe to the queue
q.subscribe({ noAck: true }, (msg) => {
console.log(msg);
});
API
RBBTClient
Constructor: new RBBTClient(url, vhost, username, password, name?)
url
: The WebSocket URL of the RabbitMQ broker (e.g.,"ws://localhost:15674/ws"
).vhost
: The virtual host to connect to (default:"/"
).username
: The username for authentication (default:"guest"
).password
: The password for authentication (default:"guest"
).name?
: Optional name for the client instance.
Methods
connect()
- Establishes a connection to the RabbitMQ broker using the provided credentials.
- Returns the
RBBTClient
instance for chaining further calls. - Example:
const conn = rbbt.connect();
close()
- Closes the connection to the RabbitMQ broker.
- Example:
conn.close();
exchange(name, options)
- Creates or retrieves an exchange by its
name
. - Parameters:
name
: The name of the exchange. If omitted, an automatically generated name is used.options
: The exchange options (default:{}
).
- Returns: An
RBBTExchange
object. - Example:
const ex = conn.exchange("amq.direct");
- Creates or retrieves an exchange by its
debug(msg)
- A function that can be overridden to log debug messages.
- Example:
conn.debug = (msg) => console.log(msg);
RBBTError
Constructor: new RBBTError(message, connection)
message
: A string describing the error.connection
: TheRBBTClient
instance that encountered the error.
RBBTError
is a custom error class used throughout the RBBTClient
library to handle client-specific errors.
Example:
try {
throw new RBBTError("Invalid connection", rbbt);
} catch (err) {
console.error(err.message); // "Invalid connection"
}
RBBTExchange
Constructor: new RBBTExchange(connection, name, options?)
connection
: TheRBBTClient
instance used for the connection.name
: The name of the exchange. If not provided, a unique name will be generated (default:""
).options?
: Optional parameters to configure the exchange:passive
: Whether the exchange is passive (default:false
).durable
: Whether the exchange is durable (default:false
).autoDelete
: Whether the exchange should auto-delete (default:false
).internal
: Whether the exchange is internal (default:false
).
Properties
connection
: The associatedRBBTClient
instance.name
: The name of the exchange.watch
: The watch subscription for the exchange.helper
: An instance ofRBBTHelpers
.queues
: The list of queues associated with the exchange.closed
: A boolean indicating whether the exchange is closed (default:false
).options
: The options used to configure the exchange.
Methods
open()
- Opens the exchange by subscribing to it and watching for messages.
- Throws an error if the client is not connected.
- Example:
const exchange = new RBBTExchange(connection, "myExchange");
close()
- Closes the exchange by unsubscribing from the watch and marking it as closed.
- Example:
exchange.close();
send(body, routingKey, properties?)
- Sends a message to the exchange with the specified
body
,routingKey
, and optionalproperties
. - Parameters:
body
: The message body (can be astring
orUint8Array
).routingKey
: The routing key for the message.properties
: Additional message properties (default:{}
).
- Example:
exchange.send("Hello, World!", "myRoutingKey");
- Sends a message to the exchange with the specified
subscribe(callback, { noAck, exclusive })
- Subscribes to the exchange and receives messages, triggering the provided
callback
function. - Parameters:
callback
: The function to handle the received messages (message: RBBTMessage
).noAck
: Whether to automatically acknowledge messages (default:false
).exclusive
: Whether the subscription is exclusive (default:false
).
- Example:
exchange.subscribe((msg) => { console.log("Received message:", msg.body); });
- Subscribes to the exchange and receives messages, triggering the provided
unsubscribe()
- Unsubscribes from the exchange and stops receiving messages.
- Example:
exchange.unsubscribe();
queue(queueName, options?)
- Returns a queue associated with the exchange, creating it if necessary.
- Parameters:
queueName
: The name of the queue (default:""
).options
: Queue options such aspassive
,durable
,autoDelete
, andexclusive
.
- Example:
const queue = exchange.queue("myQueue", { durable: true });
RBBTMessage
Constructor: new RBBTMessage(exchange)
exchange
: TheRBBTExchange
object associated with the message.
The RBBTMessage
class represents a message sent or received from an exchange or queue. It contains properties and the message body.
Properties:
exchange
: The exchange where the message was published.routingKey
: The routing key used for message delivery (default:""
).properties
: A collection of message properties (headers, delivery mode, etc.).bodySize
: The size of the message body (default:0
).body
: The message content, which can be aUint8Array
,string
, ornull
.redelivered
: A flag indicating if the message was redelivered (default:false
).
Example:
const message = new RBBTMessage(exchange);
message.body = "Hello, world!";
console.log(message.body); // Output: "Hello, world!"
RBBTQueue
Constructor: new RBBTQueue(exchange, name, options?)
exchange
: TheRBBTExchange
instance associated with the queue.name
: The name of the queue. If not provided, a unique name will be generated (default:""
).options?
: Optional parameters to configure the queue:passive
: Whether the queue should be passive (default:false
).durable
: Whether the queue should be durable (default:true
for non-empty names,false
for empty names).autoDelete
: Whether the queue should auto-delete when no consumers are connected (default:true
for empty names,false
for non-empty names).exclusive
: Whether the queue should be exclusive (default:true
for empty names,false
for non-empty names).
Methods
create()
- Creates the queue if the connection client is active and the exchange is not closed.
- Example:
const q = new RBBTQueue(ex, "myQueue");
bind(routingKey?)
- Binds the queue to an exchange with the given
routingKey
. If the queue is already bound, the binding is updated. - Parameters:
routingKey
: The routing key for binding (default:""
).
- Example:
q.bind("routing.key");
- Binds the queue to an exchange with the given
unbind(routingKey?)
- Unbinds the queue from an exchange with the given
routingKey
. - Parameters:
routingKey
: The routing key for unbinding (default:""
).
- Example:
q.unbind("routing.key");
- Unbinds the queue from an exchange with the given
subscribe(options, callback)
- Subscribes to the queue for receiving messages. A callback is triggered on each new message.
- Parameters:
options
: Subscription options:noAck
: Whether to acknowledge messages automatically (default:true
).exclusive
: Whether the subscription is exclusive (default:false
).tag
: A custom tag for the subscription (default:""
).args
: Additional arguments for the subscription (default:{}
).
callback
: The function to handle the received messages (msg: RBBTMessage
).
- Example:
q.subscribe({ noAck: false }, (msg) => { console.log(msg.body); });
unsubscribe()
- Unsubscribes from the queue and stops receiving messages.
- Example:
q.unsubscribe();
send(body, properties?)
- Sends a message to the queue with the specified
body
andproperties
. - Parameters:
body
: The message body (can be astring
orUint8Array
).properties
: Additional message properties (default:{}
).
- Example:
q.send("Hello, World!", { priority: 1 });
- Sends a message to the queue with the specified
Types
RBBTQueueParams
Defines the parameters for queue configuration.
passive
: (optional) Iftrue
, the queue must already exist (default:false
).durable
: (optional) Iftrue
, the queue will survive server restarts (default:true
ifname
is provided, otherwisefalse
).autoDelete
: (optional) Iftrue
, the queue will automatically delete itself when no longer in use (default:true
ifname
is not provided).exclusive
: (optional) Iftrue
, the queue is exclusive to the connection (default:true
ifname
is not provided).
type RBBTQueueParams = {
passive?: boolean;
durable?: boolean;
autoDelete?: boolean;
exclusive?: boolean;
};
RBBTConsumeParams
Defines the parameters for consuming messages from a queue.
tag
: (optional) A consumer tag to identify the consumer.noAck
: (optional) Iftrue
, messages are automatically acknowledged (default:true
).exclusive
: (optional) Iftrue
, the consumer is exclusive (default:false
).args
: (optional) Additional arguments for the consumer.
export type RBBTConsumeParams = {
tag?: string;
noAck?: boolean;
exclusive?: boolean;
args?: Record<string, any>;
};
RBBTProperties
Defines the message properties that can be set on a published message.
headers
: (optional) Custom headers for the message, as key-value pairs.messageId
: (optional) A unique identifier for the message.
export type RBBTProperties = {
headers?: Record<string, any>;
messageId?: string;
};
RBBTExchangeParams
Defines the parameters for exchange configuration.
passive
: (optional) Iftrue
, the exchange must already exist (default:false
).durable
: (optional) Iftrue
, the exchange will survive server restarts (default:false
).autoDelete
: (optional) Iftrue
, the exchange will be deleted when no longer in use (default:false
).internal
: (optional) Iftrue
, the exchange is used only for internal message routing (default:false
).args
: (optional) Additional arguments for the exchange, passed as key-value pairs.
export type RBBTExchangeParams = {
passive?: boolean;
durable?: boolean;
autoDelete?: boolean;
internal?: boolean;
args?: Record<string, any>;
};
Example
Below is a full example demonstrating how to set up a connection, create a channel, declare a queue, bind it to an exchange, and start receiving messages:
import { RBBTClient } from "rbbt-client";
const rbbt = new RBBTClient("ws://localhost:15674/ws", "/", "guest", "guest");
// Step 2: Connect to RabbitMQ
const connection = rbbt.connect();
// Step 3: Create an exchange (this is where messages will be sent)
const exchange = connection.exchange("my.direct.exchange", {
durable: true, // The exchange will survive server restarts
});
// Step 4: Create a queue (this is where messages will be received)
const queue = exchange.queue("my.queue", {
durable: true, // The queue will survive server restarts
});
// Step 5: Bind the queue to the exchange using a routing key
queue.bind("my.routing.key");
// Step 6: Publish a message to the exchange
exchange.send("Hello RabbitMQ!", "my.routing.key");
// Step 7: Subscribe to the queue to receive messages
queue.subscribe(
{
noAck: false,
},
(message) => {
console.log("Received message:", message.body); // Display the message body in the console
},
);
6 months ago
6 months ago
6 months ago
6 months ago
7 months ago
7 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago