pubsub-store v0.8.2
Pub/Sub Store pubsub-store
Pub/sub store and provider that use Mongoose-like schema to separate clients from data stores using a consistent protocol. At the same time maintaining the benefits of underlying pub/sub bus by allowing other listeners to subscribe to CRUD events.
Multiple stores, possibly with different underlying databases, can service requests as long as they expose the same protocol.
Providers can be used to create/update entities while others providers subscribe to notifications.
Providers support duplex streaming of entities.
Integrates nicely with graphql-schema-builder. See Examples for GraphQL client example.
NB: Currently assuming providers and underlying DB backends use the same query language.
Table of contents
Requirements
Library requires > Node 8.x with native async/await support.
Installation
yarn add pubsub-storeor
npm install --save pubsub-storeAPI
Provider
Exposes the underlying store in a convenient format.
Implements Duplex stream to create and receive created entities. See Streaming for more details.
Methods
constructor({ schema, transport, getSubjects, options: { batchSize, highWaterMark, noAckStream, timeout }}
schemaA schema object. See Schema for details.
transportA connected transport instance. Must have
request,subscribeandunsubscribemethods with following signatures:const transport = { request(subject, msg, options, cb) { // ... }, subscribe(subject, cb) { // ... return subscriptionId; }, unsubscribe(subscriptionId) { // ... } }getSubjectsOptional function that returns protocol subjects. Default implementation in subjects.js.
optionsoptionalbatchSizeMaximum result batch size. If there are more query results than
batchSize, results will be loaded in batches of that size.highWaterMarkWhen set, the stream will push messages in chunks of that size.
noAckStreamWhen
true, allows piping to provider without acknowledgement, i.e. fire and forget.timeoutQuery timeout in milliseconds (default: 1000).
count(conditions)
Returns a number of entities matching conditions.
conditionsConditions to count entities based on.
countAll()
Returns a number of all entities in store (excluding those marked as deleted).
create(object, projection)
Creates an entity based on object and returns projected fields of the new
entity.
objectObject with the fields to set.
projectionProjection of the fields from created entity to be returned.
delete(conditions, projection)
Deletes entities based on conditions and returns projected fields of deleted
entities.
conditionsConditions to delete entities based on.
projectionProjection of the fields from deleted entities to be returned.
deleteById(id, projection)
Deletes an entity based on id and returns projected fields of deleted entity.
idID to delete an entity based on.
projectionProjection of the fields from deleted entity to be returned.
find(conditions, projection, options)
Find entities based on conditions and returns projected fields of found
entities.
conditionsConditions to find entities based on.
projectionProjection of the fields from found entities to be returned.
optionsoptionalQuery options (e.g. limit).
findAll(projection, options)
Find all entities and returns projected fields of found entities.
projectionProjection of the fields from found entities to be returned.
optionsoptionalQuery options (e.g. limit).
findById(id, projections)
Find entities based on id and returns projected fields of found entity.
idID to find an entity based on.
projectionProjection of the fields from found entity to be returned.
updateById(id, object, projection)
Updates an entity based on id using object and returns projected fields of
the updated entity.
idID to update an entity based on.
objectObject that is used to update the matching entity.
projectionProjection of the fields from updated entity to be returned.
Events
create
Emitted when an entity create event is received from the underlying message bus.
update
Emitted when an entity update event is received from the underlying message bus.
create and update event listeners have the following signature:
stream-error
Emitted from either Readable or Writable side of the Duplex stream instead
of an error. In case of Writable this prevents any upstreams from unpiping.
function listener(err, query) { /* ... */ }Streaming
Since Provider implements
Duplex stream
class, entities can be piped to and from a provider instance.
const provider = new SomeProvider({ /* */ });
// Entities received from the message bus will be piped to someWritableStream
provider.pipe(someWritableStream);
// Entities from someReadableStream will be piped to the message bus
someReadableStream.pipe(provider);See client-nats-streaming example for more details.
Store
Exposes count, create, find and update methods over the pub/sub bus to be consumed by providers.
Methods
constructor({ buildModel, schema, transport, getSubjects })
buildModelA function that builds a model based on a schema. A model must have
count,create,findandupdatemethods that accept protocol arguments.createmust handleobjectbeing both a single object or an array.See server-nats-mongo example for more details.
function buildModel(schema) { return { count(conditions) { /* */ }, create(object, projection) { /* */ }, find(conditions, projection, options) { /* */ }, update(conditions, object, options) { /* */ } }; }schemaA schema object. See Schema for details.
transportA connected transport instance. Must have
subscribeandunsubscribemethods with following signatures:const transport = { subscribe(subject, cb) { // ... return subscriptionId; }, unsubscribe(subscriptionId) { // ... } }getSubjectsOptional function that returns protocol subjects. Default implementation in subjects.js.
open()
Subscribes to all subjects, effectively starting the store.
close()
Unsubscribes from all subjects, effectively stopping the store.
Events
Events are emitted on corresponding request errors.
create-errorfind-errorupdate-error
getSubjects
getSubjects(name, { prefixes, suffix })
Function that can be passed to both Provider and Store constructors and returns protocol subjects based on schema name.
nameSchema name.
prefixesObject with subject prefixes. Defaults to:
const Prefixes = { count: 'count', create: 'create', find: 'find', update: 'update' };suffixoptionalSubject suffix (default:
'', empty string)
Protocol
Protocol is implemented by Provider and Store and is presented here for reference.
NB: Currently assuming providers and underlying DB backends use the same query language.
NB: Projections cannot have both included and excluded fields.
Result
{
result: resultObject // or an array, or a value
}Error
{
error: {
message: "Error details"
}
}Count Method
Count request is published to count.schema-name subject by default. Returns
the number of entities matching conditions.
{
conditions: {
field1: 'value 2',
// etc.
}
}Create Method
Create request is published to create.schema-name subject by default. Returns
a newly-created entity or a list of entities with projection applied.
{
object: {
field1: 'value 1',
field2: 2
// etc.
},
projection: {
field1: 1
field2: 1
// etc.
}
}Find Method
Find request is published to find.schema-name subject by default. Returns a
list of entities matching conditions with projection applied or an empty list.
{
conditions: {
field1: 'value 2',
// etc.
},
projection: {
field1: 1
field2: 1
// etc.
},
options: {
limit: 1
// etc.
}
}Update Method
Update request is published to update.schema-name subject by default. Returns
an updated entity with projection applied or an empty list.
{
conditions: {
field1: 'value 2',
// etc.
},
object: {
$set: {
field2: 3
}
// etc.
},
projection: {
field1: 1
field2: 1
// etc.
},
options: {
multi: true
// etc.
}
}Schema
fields can be either an object or a function accepting { Mixed, ObjectId }.
See Mongoose Guide for more details
about Schema definition.
Schema format is shared with graphql-schema-builder.
const schemas = {
Asset: {
name: 'Asset',
description: 'An asset.',
fields: ({ Mixed, ObjectId }) => ({
customer: {
description: 'Customer that this asset belongs to.',
type: ObjectId,
ref: 'Customer',
required: true
},
parent: {
type: ObjectId,
ref: 'Asset',
required: false
},
name: {
type: String,
required: true
}
}),
dynamicFields: ({ ObjectId }) => ({
sensors: {
type: [ObjectId],
ref: 'Sensor'
}
})
},
Customer: {
name: 'Customer',
description: 'A customer.',
fields: {
name: {
description: 'The name of the customer.',
type: String,
required: true
},
// Will result in subtype
metadata: {
created: {
type: Date,
required: true
}
}
},
dynamicFields: ({ Mixed, ObjectId }) => ({
assets: {
type: [ObjectId],
ref: 'Asset'
}
})
},
Sensor: {
name: 'Sensor',
description: 'A sensor that must be connected to an asset.',
fields: ({ Mixed, ObjectId }) => ({
externalId: {
type: String,
required: false
},
asset: {
description: 'An asset that this sensor is connected to.',
type: ObjectId,
ref: 'Asset',
required: true
},
name: {
type: String,
required: false
}
})
}
};Examples
See examples for NATS, Mongo/Mongoose, GraphQL and streaming examples.
TODO
- Abstract pub/sub bus interface into transport adapters
- In-code documentation
- Implement bulk update
- Implement deleting as opposed to marking as deleted
- Implement aggregate
License
MIT