0.1.0 • Published 6 years ago

real-value v0.1.0

Weekly downloads
3
License
MIT
Repository
gitlab
Last release
6 years ago

Real Value Library

The real-value library is an expression language for composing flows of time series based data npm.io

A Stream

A stream is a set of values available through time. For the purpose of this document a stream is a common separated set of values.

//logical stream of values
,1,,,,2,,3

It would be convenient to have a from operator that can reify a stream.

from(',1,,,,2,,3')

Streams can implicitly represent time.

from(',1,0,,',{columns: 'status}') //logically expands to this '{time:0, status:null},{time: 2, status:1},{time:3,status:0},{time: 4,status:null}

Streams can explicitly represent time.

from('1100,2017\n1200,2018,\n1000,2019',{columns: ['production','year']}) //logically expands to this '{year:2017, production: 1100},{year: 2018, production:1200},{year:2019,production:1000}

Streams can process deal with multiple values concurrenly.

from('EX132,6050,51.6\nEX709,6060,65.2',{columns: ['EquipmentId','Model','SMU']}) 

Streams should be constructable from csv string, arrays, sql tables, iterators, generators. In the CoatesHire solution we could express ignition status values arriving into the system as:

from(coateshireGenerator)

A Stream Propagation

The values may flow through the stream with no time lag.

,1,,,,2,,3 -=> ,1,,,,2,,3

An log operator can be used to view a stream.

from(',1,,,,2,,3').log()

Stream Propagation Delay

The values may be delayed through a stream

,1,,,,2,,3,, -=> ,,,1,,,,2,,3

A delay operator would acheive the above translation.

from(',1,,,,2,,3').delay(2).log

We could use to represent the activity of a truck moving a payload from mine face to processing conveyor belt.

from(',1,,,,2,,3').delay(transportTime)

Stream Propagation Accumulation

The stream values may accumulate before propagating

,1,,,,2,,3,, -=> ,,,,,3,,3,, 

An accumulateTo operator could acheive this accumulation.

from(',1,,,,2,,3').accumulateto(3).log

In EOS Premier a truck can accumulate some load from a digger before moving it to a processing stockpile.

from(',1,,,,2,,3').accumulateto(truckCapaciity)

Stream Propagation Accumulation and Delay

In EOS Premier it would take a truck some time to return from the stockpile before it can be reloaded.

from(',1,,,,2,,3').compose(accumulateTo(truckCapaciity),delay(truckReturnTrip))

Stream Propagation Limiting

The values may need to be limited before propagating

,1,,,,2,,3,, -=> ,1,,,,1,1,1,1,1 

A limitTo operator could facilitate splitting defined chunks

from(',1,,,,2,,3').limitTo(1).log

An Idemitsu (coal mine) conveyor system has a defined throughput.

from(stockpile).limitTo(1)

Streams May Merge

Streams may need to flow together.

,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3, -=> b:1,a:1,a:2:b:2,,,b:3,a:3 

Note that the a:1 syntax indicate a key with a value

A merge operator could merge 2 streams.

from(',a:3,a:3,,,,,a:3,').merge(from('b:3,,b:2,,b:3,'))

In Coates Hire there are streams of asset data from Telematics Guru and from Manitou but this source is irrelevant to the UI.

from(',Asset111:on,Asset222:off,,').merge(from('Asset333:on,,Asset333:off,'))

As an alternative a from operation applied to a stream should create a merged stream.

from('1,3'{column: 'odd}).from('2,4',{column: 'event'})  //{odd:1,even:2},{odd:3,even:4},

Streams May Diverge

Streams may need to diverge

b:1,a:1,a:2:b:2,,,b:3,a:3 => ,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3,

A split operator would split the stream and return an array of streams

from('b:1,a:1,a:2:b:2,,,b:3,a:3').split([filterAs,filterBs])

In EOS premier a digger may split material into waste and coal stockpiles.

from('waste:1,coal:1,coal:2,,,waste:3,coal:3').split([filterCoal,filterWaste])

A relative to split would be just to duplicate a stream into 2 other streams.

from('1,2,3').duplicate((stream1,stream2)=>stream.log(),stream2.log()).

Streams May Be Accumulated into Categories

Streams may need to accumulate into categories

,a:1,a:2,,c:2,,b:3 => { a: 4, b: 3, c:2 } 

A table operator would acheive this

from(',a:1,a:2,,c:2,,,b:2,').table(accumulate)

For EOS Premier utilization was accumulated into assets

from(',E16:10,D10:20,,E15:10,,,E16:10,').table(accumulate)

The aggregation into a table should be capable of updating statistics

,a:1,a:3,,b:2,,b:4 => average:{ a: 2, b: 3 } 

For Coates Hire we were interested to capture statistics across the type of asset

from(',Asset1:10,Asset11:20,,Asset12:10,,,Asset10:20,').map(mapToAssetType).table(movingAverage,'averageUtilizationByType')

Note that table should accumulate but also emits changes to the tables.

from('A:1,A:10,,B:2,B:2,C:3,').table(accumulate).log() //should generate A:1,A:10,B:2,/*no change*/,C:3

Named Streams

Streams should be named so they can be queried

from(',a:1,a:2,,,,,b:3,c:4').table(accumulate).name('acache')
from('a,a,a,a,').map((e)=>stream('cache').get(x)) //1,1,1,2 

Change Propagation

When there is a change to a stream it may be desirable to reprocess the content of a table stream. For instance getting new information about depreciation rates could require reprocessing a table stream representing the value of some assets.

from(somestream).table({propagate:change}) //only table changes are propagated.
from(somestream).table({propagate:all}) //any table change generates a stream of the entire table.

Streams May Need Accumulated Between Values

Streams may need to accumulate between values

,a:1,b:1,,b:0,,a:0 => { a: 5, b: 3 } 

A delta operation might allow this to be expressed

from(',a:1,b:1,,b:0,,a:0').table(compose(delta,accumulate))

In Coates Hire utilization related to time periods between on/off events

from(',Asset111:on,Asset222:on,,Asset222:off,,Asset:on').table(compose(delta,accumulate))

Streams may need to be compared against targets

Streams may need to be compared against targets

,1,2,,,3,2,,1 ; 0,2,2,2,2,2,2,2,0 => 0%,50%,100%,0%,0%,0%,150%,100%,0%,0,50%

The from operator handles an arbitrary number of named streams.

from([
    from(',1,2,,,3,2,,1','actual'),
    from('0,2,2,2,2,2,2,2,0','target')]
).map( ({actual,target})=>actual*100/target)

from(',Asset111:on,Asset222:on,,Asset222:off,,Asset:on').table(compose(delta,accumulate))

In EOS Premier there was a need to show up to date production versus target production during the course of an interval.

Streams as components

It should be possible to define a stream with a source to be re-used multiple times.

let identityStream = map(x=>x) //The identity processing stream
from(testStream).thru(identity) // Used to test the stream definition
from(productionStream2).thru(identity) //Used to process production

Streams that update functions

Streams should be usable to update functions used in other streams. A stream of market data is used to update a function which can be used to value an individual item.

let valuer = () => ({
    let valuationModel = //the model
    trainValue: (historicalSaleData)=>{//update model },
    value: (equipment)=>valuationModel.value(equipment)
})

from(historicalSaleData).map(x=>valuer.train(x)) //We need to update a valuation model

from(onSaleData).map(x=>valuer.value(x)).table() //then we want to use the model

Examples

Young Guns

Young Guns is about scheduling resources to empty and load containers.

There are a stream of container to be unloaded from(port) Containers accumulate stuff .limitTo() Containers are transported delay(). Containers are unloaded splitTo()

GFM

There are streams of mine stopes from(mineplan) Stopes need to be cleared splitTo(diggerLoad).accumulateTo(truckCapacitiy) Trucks transport waste or mineral delay(travelTime)

MineQ Complex Event Processing

If the flow language existed we could have exposed into the mine portal to allow expression of complex event logic.

Whole of Life Model

Purchase = from('100,,,,,') PlannedMaintenance = from(',-10,-10,-10,-10) UnplannedMaintance = from(',,-5,-7,')

PlannedMaintenace.depreciate(depreciationrate).table(accumulate) UnplannedMaintenace.depreciate(depreciationrate).table(accumulate) merge(Purchase,PlannedMaintance,Unplanned).depreciate(depreciationrate).table(accumulate)

Equipment Valuation

Equipment needs to be valued be composing assets information with depreciation information related to age, utilization , location, marketsupply

function ageValuer {assettype,age} // build linear interpol from asset type and age to a value
from('...',{columns:['assetType','assetAge','value']}).map(ageValuer)  


function smuValuer {assetType,smu} //calculate variances of value based on utilization at age and then use deviation from mean utilization to adjust value
from('...',{columns:['assettype','assetSMU','value']}.map(smuvaluer) 

function locationValue{assetType,location} //
from('...',{column:['assettype','location','value']}).map(deriveLocationDepreciation)


from('assets.csv').

Install

npm install value-flow
yarn install value-flow

How to use

References

Kafka Stream Most.js reactive programming library View of flow using sankey diagram