cds-nats v2.10.1
CDS NATS
support the nats message broker for CAP NodeJS runtime.
Get Started
install dependency
npm i -S cds-natsand ref the Process Environment document to configure the
Nats Connection-Options
CDS_REQUIRES_NATS_SERVERS=127.0.0.1:4222Nats Messaging Service
Use
Natsas a message broker
Queue and Subscription
Different from than the default behavior of CAP Messaging Service, cds-nats will prefer to listen event on Queue Group instead of general Subscription, because in micro-service architecture, in most case we need the Queue/Consumer Group each message only be consumed by single service instance.
graph LR
Producer_1[Application A, Instance 1] -->|Message 1| Nats
Producer_2[Application A, Instance 2] -->|Message 2| Nats
Producer_3[Application B, Instance 1] -->|Message 3| Nats
Nats(Nats: Queue XXXX)
Nats -->|Message 3| Consumer_1[Application C, Instance 1]
Nats -->|Message 1| Consumer_2[Application C, Instance 2]
Nats -->|Message 2| Consumer_3[Application C, Instance 3]Also, if you want to apply the Publisher/Subscriber - Broadcast pattern, just add the @topic annotation to the event (the @queue annotation is also supported).
graph LR
Publisher_1[Application A, Instance 1] -->|Message 1| Nats
Publisher_2[Application A, Instance 2] -->|Message 2| Nats
Publisher_3[Application B, Instance 1] -->|Message 3| Nats
Nats(Nats: Topic YYYY)
Nats -->|Message 1, Message 2, Message 3| Subscriber_1[Application C, Instance 1]
Nats -->|Message 1, Message 2, Message 3| Subscriber_2[Application C, Instance 2]
Nats -->|Message 1, Message 2, Message 3| Subscriber_3[Application D, Instance 1]service PeopleService {
// queue group event, producer/consumer exclusive consume
// @queue: 'queueName' annotation is also supported
event changeAmount {
peopleID : UUID;
amount : Decimal;
}
// subscription event, publisher/subscriber broadcast consume
@topic : 'test.app.srv.people.broadcast'
event updateName {
peopleID : UUID;
Name : String;
Age : Integer;
}
@topic : 'test.app.srv.people.broadcast'
event updateAge {
peopleID : UUID;
Name : String;
Age : Integer;
}
}Options
TBD
{
"cds": {
"requires": {
"messaging": {
"kind": "nats"
},
"nats": {
"impl": "cds-nats"
}
}
}
}Nats KV Service
Use
Natsas a KV storeThis is an experimental feature of Nats, you MUST enable the jetstream feature in nats server
const kv = await cds.connect.to("kv") as NatsKVService;
expect(kv).toBeInstanceOf(NatsKVService)
const id = cds.utils.uuid()
const v = await kv.get(id)
expect(v).toBeNull()
expect(await kv.keys()).toHaveLength(0)
await kv.set(id, "v1")
expect(await kv.keys()).toHaveLength(1)
expect(await kv.get(id)).toBe("v1")
await kv.set(id, "v2")
expect(await kv.get(id)).toBe("v2")
await kv.remove(id)
expect(await kv.get(id)).toBeNull()
expect(await kv.keys()).toHaveLength(0)
expect(await kv.get("k3", () => "v4")).toBe("v4")
expect(await kv.get("k3")).toBe("v4")
await kv.removeAll()Options
configure the
kvservice
{
"cds": {
"requires": {
"kv": {
"kind": "nats-kv",
"ttl": 100
},
"kv5000": {
"kind": "nats-kv",
"ttl": 5000
},
"nats-kv": {
"impl": "cds-nats/lib/NatsKVService"
}
}
}
}ttl, the maximum validity for each key, in milliseconds.
Nats Distributed Lock Service
Use
Natsas a distributed lock serviceThis is an experimental feature of Nats, you MUST enable the jetstream feature in nats server
Options
{
"cds": {
"requires": {
"lock": {
"kind": "nats-lock",
"check": {
"interval": 10
},
"lock": {
"acquire": 10000
}
},
"nats-lock": {
"impl": "cds-nats/lib/NatsLockService"
}
}
}
}check.interval:NatsLockServicecheck lock in polling mode, so this is the check intervallock.acquire: if the lock of target resource could be acquired immediately,NatsLockServicewill pending, if the target lock could not be acquired in specific timeout duration,NatsLockServicewill throw error to avoid to long time pendinglock.timeout: the maximum timeout for single lock, if a resource is locked too long time, client will force acquire it, the defualt value is 1 HOUR
Nats RFC Service
Use
Natsas a RFC communication toolTo use the
NatsRFCService, MUST enable Nats Messaging Service firstly
graph LR
ServiceA[Service A Instance 1]
ServiceA -->|1. Run CQN XXX| Nats
Nats(Nats Queue Group - ServiceC)
Nats -->|2. RUN CQN XXX| ServiceC
ServiceC -->|3. Execution Result| Nats
Nats -->|4. Execution Result| ServiceA
ServiceC[Service C instance 1]Example
const cds = cwdRequireCDS()
const { INSERT } = cds.ql
const messaging = await cds.connect.to("rfc") as NatsRFCService
const remoteApp = messaging.app("demo-app-micro-service");
const remotePeopleService = remoteApp.service("test.app.srv.theosun.PeopleService")
const newPeople = await remotePeopleService.run(
INSERT.into("People").entries({ Name: cds.utils.uuid() })
)
expect(newPeople).not.toBeNull()
expect(newPeople.Name).not.toBeUndefined()
const updatedPeople = await remotePeopleService.updateWeight(newPeople.ID, 12.3)
expect(updatedPeople.Name).toBe(newPeople.Name)
expect(updatedPeople.Weight).toBe(12.3)
await expect(() => remotePeopleService.notExistFunction())
.rejects
.toThrow("method/action/function 'notExistFunction' is not existed on the service 'test.app.srv.theosun.PeopleService'")Options
{
"cds": {
"requires": {
"messaging": {
"kind": "nats"
},
"rfc": {
"kind": "nats-rfc",
"app": {
"name": "demo-app-micro-service"
},
"invoke": {
"timeout": 180000
}
},
"nats": {
"impl": "cds-nats"
},
"nats-rfc": {
"impl": "cds-nats/lib/NatsRFCService"
}
}
}
}app.name- the app name of current application, its an identifier which will be used for RFCapp.timeout- the timeout of each invocation, for example, if remote server do not respond result in 3 minutes,NatsRFCServicewill stop waiting and throw error
Features
- Nats Messaging Service
- Pub/Sub
- complex test case
- Produce/Consume
- basic support and test case
- tenant aware
tenantrecoveruserrecoveruser-attrrecover
messagingsrv.onsrv.emit
- Outbox enable
- Nats options documentation
- Pub/Sub
- Nats KV Store
- tenant aware
- get
- get with provider
- set
- delete
- Nats options documentation
- Nats Distributed Lock Service
- tenant aware
- 100 values test
- acquire timeout
- lock timeout (dead lock)
- synchronized method (high level API)
- Nats options documentation
- Nats RFC Service
- tenant aware
- OData Service query
- OData Unbounded Function/Action
- Rest Adapter operation
- Error handler
- Demo Micro Service
Compatibility Table
| @sap/cds version | cds-mysql version |
|---|---|
| 5.x | 2.9.x |
| 6.x | 2.10.x |