createDualOwnershipReactiveStateEngine

Syntax

createDualOwnershipReactiveStateEngine(name, metrics1, metrics2, dummyTable, outputTable, keyColumn1, keyColumn2, [snapshotDir], [snapshotIntervalInMsgCount], [keyPurgeFilter1], [keyPurgeFilter2], [keyPurgeFreqInSecond=0], [raftGroup], [outputHandler=NULL], [msgAsTable=false])

Argument

Only the parameters of createDualOwnershipReactiveStateEngine and createReactiveStateEngine with different usages are explained here.

For data grouped by keyColumn1, they are calculated based on metrics1 and purged based on keyPurgeFilter1; For data grouped by keyColumn2, they are calculated based on metrics2 and purged based on keyPurgeFilter2.

outputTable specifies the output table. It can be an in-memory table or a DFS table. Create an empty table and specify the names and data types of the columns before calling the function.

The output columns are in the following order:

(1) The common columns of keyColumn1 and keyColumn2;

(2) Other columns of keyColumn1 and keyColumn2;

(3) The result columns of metrics1 and metrics2.

outputHandler (optional) is a unary function or a partial function with a single unfixed parameter. If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the outputHandler function. The default value is null, which means the result will be written to the output table.

msgAsTable (optional) is a Boolean scalar indicating whether the output data is passed into function (specified by outputHandler) as a table or as a tuple. If msgAsTable=true, the subscribed data is passed into function as a table. The default value is false, which means the output data is passed into function as a tuple of columns.

Details

The dual-ownership reactive state streaming engine extends the functionality of the reactive state streaming engine with support for parallel computing on 2 groups with different metrics of a stream table. Compared to the cascade of reactive state streaming engines, this function has greatly improved the computing performance.

Note: The output table is sorted in the same order as the input, i.e., keepOrder is only set to true in the engine.

Examples

share streamTable(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT]) as trades
outputTable = table(100:0, `date`sym`market`factor1`factor2, [DATE, SYMBOL, CHAR, DOUBLE, DOUBLE])
dors = createDualOwnershipReactiveStateEngine(name="test", metrics1=<mfirst(price, 3)>, metrics2=<mmax(price, 3)>, dummyTable=trades, outputTable=outputTable, keyColumn1=`date`sym, keyColumn2=`date`market)
tmp = table(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT])
subscribeTable(tableName=`trades, actionName="test",msgAsTable=true, handler=tableInsert{dors})
insert into tmp values(2012.01.01, 09:00:00.030, `a, 'B', 10.65, 1500)
insert into tmp values(2012.01.01, 09:00:00.030, `a, 'B', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.031, `b, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.031, `a, 'B', 10.65, 1500)
insert into tmp values(2012.01.01, 09:00:00.031, `a, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.033, `b, 'B', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.033, `a, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.034, `b, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.034, `b, 'A', 10.22, 1200)
insert into tmp values(2012.01.01, 09:00:00.035, `a, 'A', 11.0, 2500)
insert into tmp values(2012.01.02, 09:00:00.031, `b, 'A', 10.22, 1200)
insert into tmp values(2012.01.02, 09:00:00.032, `a, 'B', 11.0, 2500)
insert into tmp values(2012.01.02, 09:00:00.032, `b, 'B', 15.6, 1300)
insert into tmp values(2012.01.02, 09:00:00.040, `c, 'B', 13.2, 2000)
trades.append!(tmp)

select * from outputTable
date sym market factor1 factor2
2012.01.01 a 'B'
2012.01.01 a 'B'
2012.01.01 b 'A'
2012.01.01 a 'B' 10.65 10.65
2012.01.01 a 'A' 10.59
2012.01.01 b 'B' 10.65
2012.01.01 a 'A' 10.65 10.59
2012.01.01 b 'A' 10.59 10.59
2012.01.01 b 'A' 10.59 10.59
2012.01.01 a 'A' 10.59 11
2012.01.02 b 'A'
2012.01.02 a 'B'
2012.01.02 b 'B'
2012.01.02 c 'B' 15.6
unsubscribeTable(tableName=`trades, actionName="dors")
undef(`trades,SHARED)
dropStreamEngine("dors")