@appolo/bus v8.1.5
Appolo Bus Module
bus module for appolo built with rabbot
Installation
npm i @appolo/busOptions
| key | Description | Type | Default | 
|---|---|---|---|
id | injection id | string | busProvider | 
connection | AMQP connection string | string | null | 
auto | true to auto initialize busProvider and start listen to events | boolean | true | 
listener | true to register queue event handlers | boolean | true | 
exchangeName | name of the exchange | string | |
queueName | name of the queue | string | |
appendEnv | append env name to queueName and exchangeName | boolean | true | 
exchange | exchange options | object | {} | 
queue | queue options | object | {} | 
requestQueue | request queue options | object | {} | 
replayQueue | request queue options or false to disable | object | {} | 
Exchange Options
| key | Description | Type | Default | 
|---|---|---|---|
type | request queue options or false to disable | string | topic | 
autoDelete | delete when consumer count goes to 0 | boolean | false | 
durable | survive broker restarts | boolean | true | 
persistent | persistent delivery, messages saved to disk | boolean | true | 
alternate | define an alternate exchange | string | |
publishTimeout | timeout in milliseconds for publish calls to this exchange | 2^32 | |
replyTimeout | timeout in milliseconds to wait for a reply | 2^32 | |
limit | the number of unpublished messages to cache while waiting on connection | 2^16 | |
noConfirm | prevents rabbot from creating the exchange in confirm mode | boolean | false | 
Queue Options
| key | Description | Type | Default | 
|---|---|---|---|
autoDelete | delete when consumer count goes to 0 | boolean | false | 
durable | survive broker restarts | boolean | true | 
subscribe | auto-start the subscription | boolean | false | 
limit | max number of unacked messages allowed for consumer | 2^16 | 1 | 
noAck | the server will remove messages from the queue as soon as they are delivered | boolean | false | 
noBatch | causes ack, nack & reject to take place immediately | boolean | false | 
noCacheKeys | disable cache of matched routing keys to prevent unbounded memory growth | boolean | false | 
queueLimit | max number of ready messages a queue can hold | 2^32 | |
messageTt | time in ms before a message expires on the queue | 2^32 | |
expires | time in ms before a queue with 0 consumers expires | 2^32 | 
in config/modules/all.ts
import {PubSubModule} from '@appolo/pubsub';
export = async function (app: App) {
   await app.module(new BusModule({redis:"amqp://connection-string"}));
}Usage
Publisher
import {define, singleton} from 'appolo'
import {publisher} from "@appolo/bus";
@define()
@singleton()
export class SomePublisher {
    @publisher("test")
    async publish(data: any): Promise<any> {
        return data
    }
}Or with BusProvider
@define()
@singleton()
export class SomePublisher {
    inject() busProvider:BusProvider
    publish(data:any): Promise<any> {
        return this.busProvider.publish("test",data)
    }
}Handler
if you don not call msg ack or nack
it will be called on handler return msg.ack() or msg.nack() on error
import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";
@define()
@singleton()
export class SomeHandler {
    @handler("test")
    handle(msg: IMessage<data>) {
       //do something
    }
    @handler("someName")
    handle(msg: IMessage<data>) {
        try{
           //do some thing
           msg.ack();
        }
        catch(){
            msg.nack();
        }
    }
}Request
import {define, singleton} from 'appolo'
import {request} from "@appolo/bus";
@define()
@singleton()
export class SomePublisher {
    @request("test")
    async getData(data: any): Promise<any> {
        return data
    }
    public async handleData(){
        let data = await this.getData({userId:1})
    }
}Or with BusProvider
@define()
@singleton()
export class SomePublisher {
    inject() busProvider:busProvider
    publish(data:any): Promise<any> {
        let data = await  this.busProvider.request("test",data)
        return data;
    }
}Reply
import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";
@define()
@singleton()
export class SomeHandler {
    inject() busProvider:busProvider
    @reply("test")
    handle(msg: IMessage<data>) {
        return {userId:1}
    }
    // or reply methods
    @reply("someName")
    handle(msg: IMessage<data>) {
        try{
            //get some data
         msg.replySuccess(msg,{userId:1})
        }
        catch(){
            msg.replyError(msg,e)
        }
    }
}IMessage
each handler and reply handler called with message object
{
  // metadata specific to routing & delivery
  fields: {
    consumerTag: "", // identifies the consumer to rabbit
    deliveryTag: #, // identifies the message delivered for rabbit
    redelivered: true|false, // indicates if the message was previously nacked or returned to the queue
    exchange: "" // name of exchange the message was published to,
    routingKey: "" // the routing key (if any) used when published
  },
  properties:{
    contentType: "application/json", // see serialization for how defaults are determined
    contentEncoding: "utf8", // rabbot's default
    headers: {}, // any user provided headers
    correlationId: "", // the correlation id if provided
    replyTo: "", // the reply queue would go here
    messageId: "", // message id if provided
    type: "", // the type of the message published
    appId: "" // not used by rabbot
  },
  content: { "type": "Buffer", "data": [ ... ] }, // raw buffer of message body
  body: , // this could be an object, string, etc - whatever was published
  type: "" // this also contains the type of the message published
}message.ack()
Enqueues the message for acknowledgement.
message.nack()
Enqueues the message for rejection. This will re-enqueue the message.
message.reject()
Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.
message.reply( data:any )
Acknowledges the messages and sends the message back to the requestor.
message.replySuccess( data:T )
reply the message with json object {success: true,data}
message.replyError( e: RequestError<T> )
reply the message with json object {success: false,message: e.message, data:e.data}
BusProvider
initialize()
initialize busProvider and start listen to events if not in in auto mode
publish(type: string, data: any, expire?: number): Promise<void>
publish event
- type - event name
 - data - any data
 - expire - timeout until the message is expired in the queue
 
request<T>(type: string, data: any, expire?: number): Promise<T>
request data by event return promise with event response
- type - event name
 - data - any data
 - expire - timeout until the request is rejected
 
close<T>(): Promise<void>
close the connection and clean all handlers
getQueueMessagesCount(): Promise<number>
return number of pending events in the queue
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
4 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
7 years ago