0.0.1 • Published 6 years ago

rx-rabbitmq v0.0.1

Weekly downloads
1
License
ISC
Repository
-
Last release
6 years ago

rx-rabbitmq

rxjs components to interact with rabbitmq message broker

Based on the RabbitMQ tutorials, I created these rxjs observables to interact with the different queue and exchange types in RabbitMQ.

I did not implement yet the RPC functionality, hopefully I can add that in a later version.

Usage

basic queue sender:

const { rabbitmq$, basicSubmitter$ } = require ('rx-rabbitmq')
- or -
const { rabbitmq$ } = require ('rx-rabbitmq/connection')
const { basicSubmitter$ } = require ('rx-rabbitmq/basic')

const { switchMap } = require ('rxjs/operators')

rabbitmq$ () // creating connection on amqp://localhost:5672
.pipe (
    switchMap (conn => basicSubmitter$ (conn, 'my_basic_queue'))
)
.subscribe ({
    next: q => {
        q.next ({ message: 'message 1' }) // send some messages 
        q.next ({ message: 'another message' })
        q.next ({ message: { name: 'mymsg', ... } }) // you can also send an object as the message, will be JSON.stringify ()'d
        q.complete () // closing the connection
    },
    complete: () => console.log ('Connection closed')
})

basic queue receiver:

const { rabbitmq$, basicReceiver$ } = require ('rx-rabbitmq')
- or -
const { rabbitmq$ } = require ('rx-rabbitmq/connection')
const { basicReceiver$ } = require ('rx-rabbitmq/basic')

const { switchMap } = require ('rxjs/operators')

rabbitmq$ () // creating connection on amqp://localhost:5672
.pipe (
    switchMap (conn => basicReceiver$ (conn, 'my_basic_queue'))
)
.subscribe ({
    next: msg => console.log (msg.message)
})

topic exchange sender:

const { rabbitmq$, topicSubmitter$ } = require ('rx-rabbitmq')
- or -
const { rabbitmq$ } = require ('rx-rabbitmq/connection')
const { topicSubmitter$ } = require ('rx-rabbitmq/topic')

const { switchMap } = require ('rxjs/operators')

rabbitmq$ () // creating connection on amqp://localhost:5672
.pipe (
    switchMap (conn => topicSubmitter$ (conn, 'my_topic_exchange'))
)
.subscribe ({
    next: ex => {
        ex.next ({ key: 'level1.level2.level3', message: 'message 1' }) // send some messages 
        ex.next ({ key: 'level1.anotherlevel.level3', message: { name: 'mymsg', ... } }) // you can also send an object as the message, will be JSON.stringify ()'d
        ex.complete () // closing the connection
    },
    complete: () => console.log ('Connection closed')
})

topic exchange receiver:

const { rabbitmq$, topicReceiver$ } = require ('rx-rabbitmq')
- or -
const { rabbitmq$ } = require ('rx-rabbitmq/connection')
const { topicReceiver$ } = require ('rx-rabbitmq/topic')

const { switchMap } = require ('rxjs/operators')

rabbitmq$ () // creating connection on amqp://localhost:5672
.pipe (
    switchMap (conn => topicReceiver$ (conn, 'my_topic_exchange', 'level1.*.level3'))
)
.subscribe ({
    next: msg => console.log (`Received ${msg.message} with key ${msg.routingKey}`)
})

Reactive Objects

// to import all objects, you can use:
const { 
    rabbitmq$, 
    basicReceiver$, basicSubmitter$, 
    workqueueReceiver$, workqueueSubmitter$, 
    pubsubReceiver$, pubsubSubmitter$, 
    routingReceiver$, routingSubmitter$,
    topicReceiver$, topicSubmitter$
} = require ('rx-rabbitmq')

// or when you only want to import the necessary objects
const {rabbitmq$} = require ('rx-rabbitmq/connection')

const { basicSubmitter$, basicReceiver$ } = require ('rx-rabbitmq/basic')

const { workqueueSubmitter$, workqueueReceiver$ } = require ('rx-rabbitmq/workqueue')

const { pubsubSubmitter$, pubsubReceiver$ } = require ('rx-rabbitmq/pubsub')

const { routingSubmitter$, routingReceiver$ } = require ('rx-rabbitmq/routing')

const { topicSubmitter$, topicReceiver$ } = require ('rx-rabbitmq/topic')

Install

With npm installed, run

$ npm install rx-rabbitmq

Acknowledgments

rx-rabbitmq was inspired by the creators of amqp and the excellent getting started guide of RabbitMQ

License

ISC