real-value v0.1.0
Real Value Library
The real-value library is an expression language for composing flows of time series based data
A Stream
A stream is a set of values available through time. For the purpose of this document a stream is a common separated set of values.
//logical stream of values
,1,,,,2,,3
It would be convenient to have a from
operator that can reify a stream.
from(',1,,,,2,,3')
Streams can implicitly represent time.
from(',1,0,,',{columns: 'status}') //logically expands to this '{time:0, status:null},{time: 2, status:1},{time:3,status:0},{time: 4,status:null}
Streams can explicitly represent time.
from('1100,2017\n1200,2018,\n1000,2019',{columns: ['production','year']}) //logically expands to this '{year:2017, production: 1100},{year: 2018, production:1200},{year:2019,production:1000}
Streams can process deal with multiple values concurrenly.
from('EX132,6050,51.6\nEX709,6060,65.2',{columns: ['EquipmentId','Model','SMU']})
Streams should be constructable from csv string, arrays, sql tables, iterators, generators. In the CoatesHire solution we could express ignition status values arriving into the system as:
from(coateshireGenerator)
A Stream Propagation
The values may flow through the stream with no time lag.
,1,,,,2,,3 -=> ,1,,,,2,,3
An log
operator can be used to view a stream.
from(',1,,,,2,,3').log()
Stream Propagation Delay
The values may be delayed through a stream
,1,,,,2,,3,, -=> ,,,1,,,,2,,3
A delay
operator would acheive the above translation.
from(',1,,,,2,,3').delay(2).log
We could use to represent the activity of a truck moving a payload from mine face to processing conveyor belt.
from(',1,,,,2,,3').delay(transportTime)
Stream Propagation Accumulation
The stream values may accumulate before propagating
,1,,,,2,,3,, -=> ,,,,,3,,3,,
An accumulateTo
operator could acheive this accumulation.
from(',1,,,,2,,3').accumulateto(3).log
In EOS Premier a truck can accumulate some load from a digger before moving it to a processing stockpile.
from(',1,,,,2,,3').accumulateto(truckCapaciity)
Stream Propagation Accumulation and Delay
In EOS Premier it would take a truck some time to return from the stockpile before it can be reloaded.
from(',1,,,,2,,3').compose(accumulateTo(truckCapaciity),delay(truckReturnTrip))
Stream Propagation Limiting
The values may need to be limited before propagating
,1,,,,2,,3,, -=> ,1,,,,1,1,1,1,1
A limitTo
operator could facilitate splitting defined chunks
from(',1,,,,2,,3').limitTo(1).log
An Idemitsu (coal mine) conveyor system has a defined throughput.
from(stockpile).limitTo(1)
Streams May Merge
Streams may need to flow together.
,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3, -=> b:1,a:1,a:2:b:2,,,b:3,a:3
Note that the a:1 syntax indicate a key with a value
A merge
operator could merge 2 streams.
from(',a:3,a:3,,,,,a:3,').merge(from('b:3,,b:2,,b:3,'))
In Coates Hire there are streams of asset data from Telematics Guru and from Manitou but this source is irrelevant to the UI.
from(',Asset111:on,Asset222:off,,').merge(from('Asset333:on,,Asset333:off,'))
As an alternative a from operation applied to a stream should create a merged stream.
from('1,3'{column: 'odd}).from('2,4',{column: 'event'}) //{odd:1,even:2},{odd:3,even:4},
Streams May Diverge
Streams may need to diverge
b:1,a:1,a:2:b:2,,,b:3,a:3 => ,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3,
A split
operator would split the stream and return an array of streams
from('b:1,a:1,a:2:b:2,,,b:3,a:3').split([filterAs,filterBs])
In EOS premier a digger may split material into waste and coal stockpiles.
from('waste:1,coal:1,coal:2,,,waste:3,coal:3').split([filterCoal,filterWaste])
A relative to split would be just to duplicate a stream into 2 other streams.
from('1,2,3').duplicate((stream1,stream2)=>stream.log(),stream2.log()).
Streams May Be Accumulated into Categories
Streams may need to accumulate into categories
,a:1,a:2,,c:2,,b:3 => { a: 4, b: 3, c:2 }
A table
operator would acheive this
from(',a:1,a:2,,c:2,,,b:2,').table(accumulate)
For EOS Premier utilization was accumulated into assets
from(',E16:10,D10:20,,E15:10,,,E16:10,').table(accumulate)
The aggregation into a table should be capable of updating statistics
,a:1,a:3,,b:2,,b:4 => average:{ a: 2, b: 3 }
For Coates Hire we were interested to capture statistics across the type of asset
from(',Asset1:10,Asset11:20,,Asset12:10,,,Asset10:20,').map(mapToAssetType).table(movingAverage,'averageUtilizationByType')
Note that table should accumulate but also emits changes to the tables.
from('A:1,A:10,,B:2,B:2,C:3,').table(accumulate).log() //should generate A:1,A:10,B:2,/*no change*/,C:3
Named Streams
Streams should be named so they can be queried
from(',a:1,a:2,,,,,b:3,c:4').table(accumulate).name('acache')
from('a,a,a,a,').map((e)=>stream('cache').get(x)) //1,1,1,2
Change Propagation
When there is a change to a stream it may be desirable to reprocess the content of a table stream. For instance getting new information about depreciation rates could require reprocessing a table stream representing the value of some assets.
from(somestream).table({propagate:change}) //only table changes are propagated.
from(somestream).table({propagate:all}) //any table change generates a stream of the entire table.
Streams May Need Accumulated Between Values
Streams may need to accumulate between values
,a:1,b:1,,b:0,,a:0 => { a: 5, b: 3 }
A delta
operation might allow this to be expressed
from(',a:1,b:1,,b:0,,a:0').table(compose(delta,accumulate))
In Coates Hire utilization related to time periods between on/off events
from(',Asset111:on,Asset222:on,,Asset222:off,,Asset:on').table(compose(delta,accumulate))
Streams may need to be compared against targets
Streams may need to be compared against targets
,1,2,,,3,2,,1 ; 0,2,2,2,2,2,2,2,0 => 0%,50%,100%,0%,0%,0%,150%,100%,0%,0,50%
The from
operator handles an arbitrary number of named streams.
from([
from(',1,2,,,3,2,,1','actual'),
from('0,2,2,2,2,2,2,2,0','target')]
).map( ({actual,target})=>actual*100/target)
from(',Asset111:on,Asset222:on,,Asset222:off,,Asset:on').table(compose(delta,accumulate))
In EOS Premier there was a need to show up to date production versus target production during the course of an interval.
Streams as components
It should be possible to define a stream with a source to be re-used multiple times.
let identityStream = map(x=>x) //The identity processing stream
from(testStream).thru(identity) // Used to test the stream definition
from(productionStream2).thru(identity) //Used to process production
Streams that update functions
Streams should be usable to update functions used in other streams. A stream of market data is used to update a function which can be used to value an individual item.
let valuer = () => ({
let valuationModel = //the model
trainValue: (historicalSaleData)=>{//update model },
value: (equipment)=>valuationModel.value(equipment)
})
from(historicalSaleData).map(x=>valuer.train(x)) //We need to update a valuation model
from(onSaleData).map(x=>valuer.value(x)).table() //then we want to use the model
Examples
Young Guns
Young Guns is about scheduling resources to empty and load containers.
There are a stream of container to be unloaded from(port) Containers accumulate stuff .limitTo() Containers are transported delay(). Containers are unloaded splitTo()
GFM
There are streams of mine stopes from(mineplan) Stopes need to be cleared splitTo(diggerLoad).accumulateTo(truckCapacitiy) Trucks transport waste or mineral delay(travelTime)
MineQ Complex Event Processing
If the flow language existed we could have exposed into the mine portal to allow expression of complex event logic.
Whole of Life Model
Purchase = from('100,,,,,') PlannedMaintenance = from(',-10,-10,-10,-10) UnplannedMaintance = from(',,-5,-7,')
PlannedMaintenace.depreciate(depreciationrate).table(accumulate) UnplannedMaintenace.depreciate(depreciationrate).table(accumulate) merge(Purchase,PlannedMaintance,Unplanned).depreciate(depreciationrate).table(accumulate)
Equipment Valuation
Equipment needs to be valued be composing assets information with depreciation information related to age, utilization , location, marketsupply
function ageValuer {assettype,age} // build linear interpol from asset type and age to a value
from('...',{columns:['assetType','assetAge','value']}).map(ageValuer)
function smuValuer {assetType,smu} //calculate variances of value based on utilization at age and then use deviation from mean utilization to adjust value
from('...',{columns:['assettype','assetSMU','value']}.map(smuvaluer)
function locationValue{assetType,location} //
from('...',{column:['assettype','location','value']}).map(deriveLocationDepreciation)
from('assets.csv').
Install
npm install value-flow
yarn install value-flow
How to use
References
Kafka Stream Most.js reactive programming library View of flow using sankey diagram
6 years ago