createReactiveStateEngine

Syntax

createReactiveStateEngine(name, metrics, dummyTable, outputTable, keyColumn, [filter], [snapshotDir], [snapshotIntervalInMsgCount], [keepOrder], [keyPurgeFilter], [keyPurgeFreqInSecond=0], [raftGroup], [outputElapsedMicroseconds=false], [keyCapacity=1024], [parallelism=1], [outputHandler=NULL], [msgAsTable=false])

Details

This function creates a reactive state engine and returns a table object. Writing to the table means that data is ingested into the reactive state engine for calculation.

The following state functions are optimized in DolphinDB reactive state engine. Note that unoptimized built-in state functions are not supported by this engine. Aggregate functions should be avoided.

Note: If function talib is used as a state function, the first parameter func must be a state function.

For more application scenarios, see Streaming Engines.

Calculation Rules

The reactive state engine outputs a result for each input. If multiple records are ingested into the reactive state engine at the same time, the data is calculated in batches. The number of records in each batch is determined by the system.

  • To output only the results that met the specified conditions, set the parameter filter;

  • To perform calculations by group, set the parameter keyColumn;

  • To preserve the insertion order of the records in the output table, set the parameter keepOrder.

Features

  • State cleanup: States in the engine are maintained by group. A large number of groups may lead to high memory overhead, and you can set a cleanup rule to clear data that are no longer needed. (See parameters keyPurgeFilter and keyPurgeFreInSecond)

  • Snapshot: Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See parameters snapshotDir and snapshotIntervalInMsgCount)

  • High availability: To enable high availability for streaming engines, specify the parameter raftGroup on the leader of the raft group on the subscriber. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table.

Arguments

name is a string of the engine name. It is the only identifier of a reactive state engine on a data/compute node. It can have letter, number and "_" and must start with a letter.

metrics is metacode specifying the formulas for calculation. The metacode can include one or more expressions, built-in or user-defined functions, or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form. For more information about metacode refer to Metaprogramming. To use a user-defined function in the reactive state engine,

(1) Add @state to declare the function before the definition. For state functions, the following statements are supported:
  • Assignment and return statements

  • if...else statements with scalar expressions (since 1.30.21/2.00.9)

  • for loops, including break and continue (since 1.30.23/2.00.11). Loop iterations must be under 100 times. Nested for loops are currently unsupported.

(2) Stateless or state functions can be used in a reactive state engine, but the metrics parameter cannot be specified as the stateless function nesting with the state function.

(3) If the rvalue of an assignment statement is a built-in or user-defined function that returns multiple values, the values must be assigned to variables at the same time. In the following example, the user-defined state function references linearTimeTrend, which returns two values.

@state
def forcast2(S, N){
	linearregIntercept, linearregSlope = linearTimeTrend(S, N)
	return (N - 1) * linearregSlope + linearregIntercept
}

Note: The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.

dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.

outputTable is the output table for the results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.

The output columns are in the following order:

(1) If keyColumn is specified, the first few columns must match its order.

(2) If the outputElapsedMicroseconds is set to true, specify two more columns: a LONG column for elapsed time of each batch and an INT column for total records in each batch.

(3) Then followed by one or more result columns.

keyColumn (optional) is a STRING scalar/vector indicating the grouping column(s). The calculation is conducted within each group.

filter (optional) is the metacode that indicates the filtering conditions. A filtering condition must be an expression and only columns of dummyTable can be included. You can specify multiple conditions with logical operators (and, or). Only the results that satisfy the filter conditions are ingested to the output table.

To enable snapshot in the streaming engines, specify parameters snapshotDir and snapshotIntervalInMsgCount.

snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved. The directory must already exist, otherwise an exception is thrown. If snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state. Multiple streaming engines can share a directory where the snapshot files are named as the engine names.

The file extension of a snapshot can be:

  • <engineName>.tmp: temporary snapshot

  • <engineName>.snapshot: a snapshot that is generated and flushed to disk

  • <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.

keepOrder (optional) specifies whether to preserve the insertion order of the records in the output table. If keyColumn contains a time column, the default value is true, and otherwise false.

To clean up the data that is no longer needed after calculation, specify parameters keyPurgeFilter and keyPurgeFreqInSecond.

keyPurgeFilter (optional) indicates the filtering conditions that identify the data to be purged from the cache. It is metacode composed of conditional expressions, and these expressions must refer to the columns in the outputTable. keyPurgeFilter is effective only when keyColumn is specified.

keyPurgeFreqInSecond (optional) is a positive integer indicating the time interval (in seconds) to trigger a purge. keyPurgeFreqInSecond is effective only when keyColumn is specified.

For each data ingestion, the engine starts a purge if all of the following conditions are satisfied:

(1) The time elapsed since the last data ingestion is equal to or greater than keyPurgeFreqInSecond (For the first check, the time elapsed between the ingestion of data and the creation of the engine is used);

(2) If the first condition is satisfied, the engine applies keyPurgeFilter to the cached data to get the data to be purged.

(3) The number of groups which contain data to be purged is equal to or greater than 10% of the total number of groups in the engine.

To check the engine status before and after the purge, call getStreamEngineStat().ReactiveStreamEngine (see getStreamEngineStat) where the numGroups field indicates the number of groups in the reactive state streaming engine.

raftGroup (optional) is an integer greater than 1, indicating ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. Specify raftGroup to enable high availability for the streaming engine. When an engine is created on the leader, it is also created on each follower and the engine snapshot is synchronized to the followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that SnapShotDir must also be specified when specifying a raft group.

outputElapsedMicroseconds (optional) is a Boolean value. The default value is false. It determines whether to output:

  • the elapsed time (in microseconds) from the ingestion of data to the output of result in each batch.

  • the total number of each batch.

If specified, two additional columns must be added to the output table (see outputTable).

keyCapacity (optional) is a positive integer indicating the amount of memory allocated for buffering state of each group (defined by keyColumn) on a row basis. The default value is 1024. For data with large amount of groups, setting of this parameter can reduce the latency that may occur.

parallelism (optional) is a positive integer no greater than 63, indicating the maximum number of workers that can run in parallel. The default value is 1. For large computation workloads, reasonable adjustment of this parameter can effectively utilize computing resources and reduce computation time.

Note: parallelism cannot exceed the lesser of the numbers of licensed cores and logical cores minus one.

outputHandler (optional) is a unary function or a partial function with a single unfixed parameter. If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the outputHandler function. The default value is null, which means the result will be written to the output table.

msgAsTable (optional) is a Boolean scalar indicating whether the output data is passed into function (specified by outputHandler) as a table or as a tuple. If msgAsTable=true, the subscribed data is passed into function as a table. The default value is false, which means the output data is passed into function as a tuple of columns.

Examples

Example 1.

def sum_diff(x, y){
     return (x-y)/(x+y)
}

factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>
share streamTable(1:0, `sym`time`price, [STRING,DATETIME,DOUBLE]) as tickStream
share table(1000:0, `sym`time`factor1, [STRING,DATETIME,DOUBLE]) as result
rse = createReactiveStateEngine(name="reactiveDemo", metrics =[<time>, factor1], dummyTable=tickStream, outputTable=result, keyColumn="sym", filter=<sym in ["000001.SH", "000002.SH"]>)
subscribeTable(tableName=`tickStream, actionName="factors", handler=tableInsert{rse})

data1 = table(take("000001.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 10+cumsum(rand(0.1, 100)-0.05) as price)
data2 = table(take("000002.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 20+cumsum(rand(0.2, 100)-0.1) as price)
data3 = table(take("000003.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 30+cumsum(rand(0.3, 100)-0.15) as price)
data = data1.unionAll(data2).unionAll(data3).sortBy!(`time)

replay(inputTables=data, outputTables=tickStream, timeColumn=`time)

// Execute the following code before re-run the above code.
unsubscribeTable(tableName=`tickStream, actionName="factors")
dropStreamEngine(`reactiveDemo)
undef(`tickStream, SHARED)

The result only contains the stocks "000001.SH" and "000002.SH" that are specified in the filtering condition.

Execute the following code before re-run the above code.

unsubscribeTable(tableName=`tickStream, actionName="factors")
dropStreamEngine(`reactiveDemo) undef(`tickStream, SHARED)

Example 2. Calculate in groups by date and sym column. Then output the result which time is between "2012.01.01" and "2012.01.03".

share streamTable(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT]) as trades
share table(100:0, `date`sym`factor1, [DATE, STRING, DOUBLE]) as outputTable
engine = createReactiveStateEngine(name="test", metrics=<mavg(price, 3)>, dummyTable=trades, outputTable=outputTable, keyColumn=["date","sym"], filter=<date between 2012.01.01 : 2012.01.03>, keepOrder=true)
subscribeTable(tableName=`trades, actionName="test", msgAsTable=true, handler=tableInsert{engine})

n=100
tmp = table(rand(2012.01.01..2012.01.10, n) as date, rand(09:00:00.000..15:59:59.999, n) as time, rand("A"+string(1..10), n) as sym, rand(['B', 'S'], n) as market, rand(100.0, n) as price, rand(1000..2000, n) as qty)
trades.append!(tmp)
select * from outputTable

Example 3. Since version 2.00.9, higher-order function moving can be called in the reactive state engine to calculate array vectors.

defg myFactor(x){
    return avg(var(x));
}
share streamTable(1:0, `DateTime`SecurityID`Trade, [TIMESTAMP, SYMBOL, DOUBLE[]]) as tickStream
share table(1000:0, `SecurityID`DateTime`result, [SYMBOL, DATETIME, DOUBLE]) as result
rse = createReactiveStateEngine(name="reactiveDemo", metrics =<[DateTime, moving(myFactor, Trade, 3, 1)]>, dummyTable=tickStream, outputTable=result, keyColumn="SecurityID")
DateTime = 2022.09.15T09:00:00.000+1..12
SecurityID = take(`600021, 12)
Trade = [[10.06, 10.06], [10.04], [10.05, 10.06, 10.05, 10.08],[10.02,10.01], [10.06, 10.06, 10.05, 10.05], [10.04], [10.05,10.08, 10.09],[10.02,10.01],[10.06, 10.06, 10.05], [10.04, 10.03], [10.05, 10.06, 10.05, 10.08, 10.09],[10.02]]
t = table(1:0, `DateTime`SecurityID`Trade, [TIMESTAMP, SYMBOL, DOUBLE[]])
tableInsert(t, DateTime, SecurityID, Trade)
rse.append!(t)
select * from result
dropStreamEngine("reactiveDemo")

Example 4. Define a constant in metrics indicating the factor name based on the above example.

defg myFactor(x){
   return avg(var(x));
}
share streamTable(1:0, `DateTime`SecurityID`Trade, [TIMESTAMP, SYMBOL, DOUBLE[]]) as tickStream
share table(1000:0, `SecurityID`DateTime`factorName`result, [SYMBOL, DATETIME, STRING, DOUBLE]) as result
rse = createReactiveStateEngine(name="reactiveDemo", metrics =<[DateTime,"factor1", moving(myFactor, Trade, 3, 1)]>, dummyTable=tickStream, outputTable=result, keyColumn="SecurityID")
DateTime = 2022.09.15T09:00:00.000+1..12
SecurityID = take(`600021, 12)
Trade = [[10.06, 10.06], [10.04], [10.05, 10.06, 10.05, 10.08],[10.02,10.01], [10.06, 10.06, 10.05, 10.05], [10.04], [10.05,10.08, 10.09],[10.02,10.01],[10.06, 10.06, 10.05], [10.04, 10.03], [10.05, 10.06, 10.05, 10.08, 10.09],[10.02]]
t = table(1:0, `DateTime`SecurityID`Trade, [TIMESTAMP, SYMBOL, DOUBLE[]])
tableInsert(t, DateTime, SecurityID, Trade)
rse.append!(t)
select * from result
SecurityID DateTime factorName result
600021 2022.09.15 09:00:00 factor1 0
600021 2022.09.15 09:00:00 factor1 0001
600021 2022.09.15 09:00:00 factor1 0.0002
600021 2022.09.15 09:00:00 factor1 0.0006
600021 2022.09.15 09:00:00 factor1 0.0004
600021 2022.09.15 09:00:00 factor1 0.0004
600021 2022.09.15 09:00:00 factor1 0.0003
600021 2022.09.15 09:00:00 factor1 0.001
600021 2022.09.15 09:00:00 factor1 0.0007
600021 2022.09.15 09:00:00 factor1 0.0004