@corva/node-stream-lambda v5.0.0
node-stream-lambda
Stream lambda class for constructing data apps that handle stream.
ToC
Requirements
- node.js - >=12.0.0
Getting started
Installation
npm i @corva/node-stream-lambdaFeatures
- Defines stream workflow
Workflow
Scheduled lambda has 3 phases:
- pre-process (groups events by asset id and filters already processed records)
- process
- post-process (saves last processed records info to redis)
@startuml
(*) -->[{event}] "preProcess"
if "Exception?" then
-->[true, {error}] "postProcess"
else
-->[false, {event}] "process"
-->[{data, error}] "postProcess"
endif
-->[End] (*)
@endumlUsually you need to define process function to handle the data. Pre-process is used to modify incoming data in some way. Post-process is used to handle processed data or/and handle and log errors that were thrown during pre-process or process.
Also it expose .run method that is used to launch processing.
Configuration
Stream config
/**
* App name, in most cases should match application name
*/
name: string;
/**
* Provider name
*/
provider: string;
/**
* Maps keys to snakeCase if true
* @default false
*/
mapKeys?: boolean;
/**
* Filtering settings allow to filter alraeady processed events to avoid duplicates, disabled by default
*/
filter?: {
/**
* Filter incoming records by last processed timestamp (will work only for time-based data)
* @default false
*/
byLastProcessedTimestamp?: boolean;
/**
* Filter incoming records by last processed depth (will work only for depth-based data)
* @default false
*/
byLastProcessedDepth?: boolean;
};Pre-process
v2
Stream lambda pre-process does sevral preparation steps:
- Converts "old" and "new" stream event format into common style. You can find new event format in examples;
- Detects end of "drilling" and adds flag
Symbol.for('completed')to records that passed toprocessineventparameter. - Groups event records by
asset_idand callsprocessfor eachasset_idindividually, so that could result in multipleprocessandpost-processcalls in case when you have multipleasset_ids in events. However it's not usual case. - Filters events, that are "already processed" depending on state from previous run (see post-process for details).
v3+
- Optionally maps keys to camelCase and adds
assetId,appConnectionIdandisCompletedfileds to event. - Filters events, that are "already processed" depending on state from previous run (see post-process for details).
Event transformation
Events that come in new format:
[
{
"metadata": {
"apps": {
"corva.wits-depth-summary": {
"app_connection_id": 123
}
},
"app_stream_id": 456
},
"records": [
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Some unnecessary drilling that's excluded"
}
},
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
}
]
}
]will be tranformed into:
[
{
"assetId": 1,
"appConnectionId": 123,
"isCompleted": false,
"metadata": {
"apps": {
"corva.wits-depth-summary": {
"app_connection_id": 123
}
},
"app_stream_id": 456
},
"records": [
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Some unnecessary drilling that's excluded"
}
},
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
}
]
}
]And that's what you'll receive in process handler in event parameter.
Post-process
Stream lambda in post-process saves last processed record into it's state which is stored in redis.
This is done to prevent already processed records being processed sevral times.
If process throws an error this will be skipped and state will not be changed.
Cache key for state is like:
/**
* provider - company's identifier, i.e. my-oil-company
* assetId - well's identifier, taken from the event
* appStreamId - taken from the event
* appKey - app's unique identifier, taken from app config
* appConnectionId - taken from event
*/
`${provider}/well/${assetId}/stream/${appStreamId}/${appKey}/${appConnectionId}`Value is hash map, so use hset, hget, hmget and hgetall to reach out values you need. By default it sets state.lastProcessedTimestamp for time-based data and state.lastProcessedDepth for depth-based
Examples
const { StreamLambda } = require('@corva/node-stream-lambda');
// you may need api client and other libs
const { ApiClient } = require('@corva/node-api-client');
const apiClient = new ApiClient();
const lambda = new StreamLambda({
apiClient,
streamConfig: {},
process: async ({ event, context }) => {
const result = event.a + event.b;
return result;
},
});Event
Here's an example of stream event that will be passed to lambda:
[
{
"metadata": {
"apps": {
"corva.wits-depth-summary": {
"app_connection_id": 123
}
},
"app_stream_id": 456
},
"records": [
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Some unnecessary drilling that's excluded"
}
},
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
},
{
"asset_id": 1,
"timestamp": 1546300900,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.5,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
},
{
"asset_id": 1,
"timestamp": 1546301000,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.9,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
},
{
"asset_id": 1,
"timestamp": 1546301100,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 100.3,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
},
{
"asset_id": 1,
"timestamp": 1546301200,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 100.5,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
},
{
"asset_id": 1,
"timestamp": 1546301300,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 100.6,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
}
]
}
]