tasu v5.0.0
![]()
Tasu
An async/await wrapper over node-nats,
designed to easily integrate with your microservice code. Taşuu (ташуу)
is 'transport' in Kyrgyz.
Installation
npm i tasuUsage
Create an instance of the transport:
const Tasuu = require('tasu');
async function main() {
...
const tasu = new Tasuu({group: 'some-service', ...});
await tasu.connected();
}Publish a request and get a response via tasu.request() on one end:
const {bar} = await tasu.request('foo', {arg: 1});Note: this method uses requestOne inside, no need to worry about max
responses
Subscribe and respond to a request on the other:
tasu.listen('foo', async ({arg}, respond) => {
const bar = await someDatabaseCall(arg);
return {bar};
});Note: A listener is automatically added to queue group foo.listeners;
errors of the handling function are caught and sent back as error response
Publish an event on one end:
tasu.publish('some.package.sent', {...});Subscribe and process as worker queue on the other:
tasu.process('*.package.sent', (pack, subject) => {
console.log(subject, pack);
});listen, subscribe and process methods return an integer
subscription ID (SID) which can be used to unsubscribe from a subject:
const sid = tasu.process('*.package.sent', (pack, subject) => {
console.log(subject, pack);
});
...
tasu.unsubscribe(sid);The above technique is useful for websocket connections.
Close NATS connection (if needed):
tasu.close();Configuration
You can set the following config options while creating a new Tasu instance:
url- NATS connection urls, default:nats://localhost:4222group- group name this transport will be assigned to, default:defaultrequestTimeout- how long to wait for response on NATS requests, default:10000level- set log level for provided logger. Default isdebuglogger- define custom logger. Should basically haveinfo,debug,errormethods.
API
connected() - returns a promise resolved on successful connection to NATS server. You basically always want this resolved before proceeding with Tasu.
publish(subject, message) - optimistically publish a
messageto asubject. TODO: make this return a promise too.listen(subject, async (message)=>{...}) - subscribes to a
subjectand respond via handler function. Handler function must be defined viaasync, it getsmessageobject as the only argument and must return something as successful response. Errors are caught and sent back as error response. Returns subscription id.subscribe(subject, (message, replyTo, subject)=>{...}) - subscribes to a
subjectand process messages with handler function. All memebers of the group will get subject messages. Returns subscription id.subOnce(subject, (message, subject)=>{...}) - same as
subscribe()but unsubscribes immediately after receiving a message.process(subject, (message, subject)=>{...}) - subscribes to a
subjectas a queue worker and processes messages via handler function. Messages will be distributed randomly among free group members. Returns subscription id.request(subject[, message]) - performs a request and returns a response promise.
unsubscribe(sid) - unsubscribes from subscription identified by
sid(returned bysubscribe,subOnce,process,listen)close() - closes connection to NATS. Always try to exit gracefully, otherwise the connection will persist until next heartbeat.
Credits
Icons by icons8