createDualOwnershipReactiveStateEngine
Syntax
createDualOwnershipReactiveStateEngine(name, metrics1, metrics2, dummyTable,
outputTable, keyColumn1, keyColumn2, [snapshotDir],
[snapshotIntervalInMsgCount], [keyPurgeFilter1], [keyPurgeFilter2],
[keyPurgeFreqInSecond=0], [raftGroup], [outputHandler=NULL],
[msgAsTable=false])
Argument
Only the parameters of createDualOwnershipReactiveStateEngine
and
createReactiveStateEngine with different usages are explained here.
For data grouped by keyColumn1, they are calculated based on metrics1 and purged based on keyPurgeFilter1; For data grouped by keyColumn2, they are calculated based on metrics2 and purged based on keyPurgeFilter2.
outputTable specifies the output table. It can be an in-memory table or a DFS table. Create an empty table and specify the names and data types of the columns before calling the function.
The output columns are in the following order:
(1) The common columns of keyColumn1 and keyColumn2;
(2) Other columns of keyColumn1 and keyColumn2;
(3) The result columns of metrics1 and metrics2.
outputHandler (optional) is a unary function or a partial function with a single unfixed parameter. If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the outputHandler function. The default value is null, which means the result will be written to the output table.
msgAsTable (optional) is a Boolean scalar indicating whether the output data is passed into function (specified by outputHandler) as a table or as a tuple. If msgAsTable=true, the subscribed data is passed into function as a table. The default value is false, which means the output data is passed into function as a tuple of columns.
Details
The dual-ownership reactive state streaming engine extends the functionality of the reactive state streaming engine with support for parallel computing on 2 groups with different metrics of a stream table. Compared to the cascade of reactive state streaming engines, this function has greatly improved the computing performance.
Examples
share streamTable(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT]) as trades
outputTable = table(100:0, `date`sym`market`factor1`factor2, [DATE, SYMBOL, CHAR, DOUBLE, DOUBLE])
dors = createDualOwnershipReactiveStateEngine(name="test", metrics1=<mfirst(price, 3)>, metrics2=<mmax(price, 3)>, dummyTable=trades, outputTable=outputTable, keyColumn1=`date`sym, keyColumn2=`date`market)
tmp = table(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT])
subscribeTable(tableName=`trades, actionName="test",msgAsTable=true, handler=tableInsert{dors})
insert into tmp values(2012.01.01, 09:00:00.030, `a, 'B', 10.65, 1500)
insert into tmp values(2012.01.01, 09:00:00.030, `a, 'B', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.031, `b, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.031, `a, 'B', 10.65, 1500)
insert into tmp values(2012.01.01, 09:00:00.031, `a, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.033, `b, 'B', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.033, `a, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.034, `b, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.034, `b, 'A', 10.22, 1200)
insert into tmp values(2012.01.01, 09:00:00.035, `a, 'A', 11.0, 2500)
insert into tmp values(2012.01.02, 09:00:00.031, `b, 'A', 10.22, 1200)
insert into tmp values(2012.01.02, 09:00:00.032, `a, 'B', 11.0, 2500)
insert into tmp values(2012.01.02, 09:00:00.032, `b, 'B', 15.6, 1300)
insert into tmp values(2012.01.02, 09:00:00.040, `c, 'B', 13.2, 2000)
trades.append!(tmp)
select * from outputTable
date | sym | market | factor1 | factor2 |
---|---|---|---|---|
2012.01.01 | a | 'B' | ||
2012.01.01 | a | 'B' | ||
2012.01.01 | b | 'A' | ||
2012.01.01 | a | 'B' | 10.65 | 10.65 |
2012.01.01 | a | 'A' | 10.59 | |
2012.01.01 | b | 'B' | 10.65 | |
2012.01.01 | a | 'A' | 10.65 | 10.59 |
2012.01.01 | b | 'A' | 10.59 | 10.59 |
2012.01.01 | b | 'A' | 10.59 | 10.59 |
2012.01.01 | a | 'A' | 10.59 | 11 |
2012.01.02 | b | 'A' | ||
2012.01.02 | a | 'B' | ||
2012.01.02 | b | 'B' | ||
2012.01.02 | c | 'B' | 15.6 |
unsubscribeTable(tableName=`trades, actionName="dors")
undef(`trades,SHARED)
dropStreamEngine("dors")