@printbear/event-store v0.5.0
Event-store
Usage
First, install with Yarn:
yarn add @printbear/event-store
Quick usage example:
import { createStore } from 'event-store'
import reducers from './reducers'
const store = await createStore('my-topic', reducers)
After those steps you'll be ready to read and write data to the store.
createStore(topic, reducers [, options])
createStore
is a utility function that:
- Creates a new instance of the
EventStore
- Initializes the database adapter
- Initializes a MuleMQ client to publish events
topic
is the MuleMQ topic to which events will be published. For thePostgresStore
adapter, it will also be part of the table name that will be used in the[topic]_events
format (i.e.orders_events
). It should be the same as your service name.reducers
is an object that defines your reducers in the following format:
type Reducers = { [string]: (entity: any, event: Event) => any }
options
adapter
:createStore
will use thePostgresStore
adapter by default. If you wish to use a different adapter, specify it in the options:createStore('orders', reducers, { adapter: 'memory' })
. It currently supports thememory
andpostgres
adapters.publish
:createStore
will publish events to MuleMQ by default. It can be useful to skip message publishing, e.g. for testing purposes. In that case, you can specify thepublish: false
option.filter
:createStore
will apply a filter to events before publishing them to MuleMQ. This allows to mutate the event that will be sent or not to send the event at all (in case the filter returns null). The filter type is:(event: Event, store: EventStore) => Promise<Event>
.errorReporter
:createStore
will call this function when an event publish fails. The reporter type is:(err: Error) => void
. The default reporter isconsole.error
.publishRetryOptions
:createStore
will apply this retry options when an event is published. The retry options type is:{ retries: number factor: number, minTimeout: number, maxTimeout: number, randomize: boolean }
. The retry timeout is calculated as:Math.min(random * minTimeout * Math.pow(factor, attempt), maxTimeout)
. For more info about these parameters refer to node-retry.
Example uses
Initializing the event store for regular usage (dev/production). Uses Postgres for persistence and MuleMQ for event publishing.
import { createStore } from 'event-store'
import reducers from './reducers'
const store = await createStore('orders', reducers)
Initializing the event store for testing. Uses in-memory persistence and no event publishing.
import { createStore } from 'event-store'
import reducers from 'reducers'
const store = await createStore('orders', reducers, { adapter: 'memory', publish: false })
Usage of transaction.
const store = await createStore('orders', reducers)
async function ex () {
const tx = store.transaction('order', '1')
const persistedEvent: ?PersistedEvent = await tx(async aggregate => {
(...)
/*
* Safely check the aggregate to decide if an event should be dispatched or not for this entity.
* If null is returned, it will stop the operation and no event is dispatched.
* NOTE: This is implemented via a optimistic locking solution. If there were changes to this entity this operation will retry.
*/
return null
})
}
Usage of subscribers (creates a wrapper around mulemq-js and redefines the listen
method).
const store = await createStore('cashier', reducers)
const subscriber = store.subscriber({topic: 'orders', service: 'cashier'})
subscriber.listen((event: PersistedEvent) => {
(...)
/*
* events(entityType, entityId) will be received in order.
* e.g.: Event of an entity with version=2 won't be received until the event with version=1 is received.
*
* Note: It isn't guaranteed that the event will only be received once in case of a service failure.
*/
})
How it works
http://microservices.io/patterns/data/event-sourcing.html
The store is a singleton object and related methods that handles persisting and publishing events. When a new event is dispatched, the store will:
- Run it through the reducers
- Persist the event to a database
- Publish the event to the message broker
- Return the new aggregate returned by the reducers.
Also, when retrieving an entity instance, it will:
- Fetch the last event and return the aggregate
If it isn't calculated yet, it will:
- Fetch all the events for that instance from the database
- Run all events through the reducers
- Return the result.
Development workflow
Development workflow is very similar to Storefront
To work on Event-store, check it out on your machine:
- Install dependencies with
yarn install
- Edit files in
src/
. (The files inlib/
are build output.) - Format the code with
yarn format
and lint it withyarn lint
- Build with
yarn build
and run tests withyarn test
. This will run flow type checks and the tests. - Before committing, check that you have run the build with the latest version of files in src.