createThresholdEngine

Syntax

createThresholdEngine(name, threshold, metrics, dummyTable, outputTable, thresholdColumn, [keyColumn], [timeColumn], [sessionBegin], [sessionEnd], [keyPurgeDaily=false], [forceTriggerSessionEndTime=0], [snapshotDir], [snapshotIntervalInMsgCount], [outputElapsedMicroseconds=false], [outputThreshold=false])

Details

This function creates a threshold engine to implement aggregate calculations triggered by cumulative value thresholds. Each time the cumulative value of the threshold column (thresholdColumn) reaches a specific threshold (threshold*n), an aggregate calculation is triggered:

  • If keyColumn is specified, the aggregate calculation will be performed independently in each group.
  • If sessionBegin and sessionEnd are specified, only data in the range [sessionBegin, sessionEnd] will be included in the calculation.

Return value: A table object. Data written to this table is ingested into the engine for aggregation.

Arguments

name is a string indicating the name of the engine. It is the only identifier of an engine on a data or compute node. It can have letter, number and "_" and must start with a letter.

threshold is a positive integer indicating the threshold step size.

metrics is metacode or a tuple specifying the calculation formulas. For more information about metacode please refer to Metaprogramming.

  • It can use one or more built-in or user-defined aggregate functions (which must be defined by the defg keyword) such as <[sum(volume), avg(price)]>, or expressions of aggregate functions such as as <[avg(price1)-avg(price2)]>, or aggregate functions involving multiple columns such as <[std(price1-price2)]>.
  • You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2> (it's optional to specify the column names).
Note:
  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
  • Nested aggregate function calls are not supported in metrics.

dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.

outputTable is a table to which the engine inserts calculation results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.

The output columns are in the following order:

keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.

timeColumn (optional) is a STRING scalar or vector specifying the time column(s) of the subscribed stream table. When useSystemTime = false, it must be specified.

Note: If timeColumn is a vector, it must have a date element (of DATE type) and a time element (of TIME, SECOND or NANOTIME type). In this case, the first column in outputTable must take the data type of concatDateTime(date, time).

sessionBegin (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the starting time of each session. If it is a vector, it must be increasing.

sessionEnd (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the end time of each session. Specify sessionEnd as 00:00:00 to indicate the beginning of the next day (i.e., 24:00:00 of the current day).

keyPurgeDaily (optional) is a Boolean value determining if existing data groups are automatically removed when newer data of a subsequent calendar day is ingested. The default value is true. If set to false, groups of the previous calendar day are retained.

forceTriggerSessionEndTime (optional) is a non-negative integer. The unit of forceTriggerSessionEndTime is consistent with the precision of timeColumn. It indicates the waiting time to force trigger calculation in the window containing the sessionEnd, if it ends without calculation. The default value is 0, indicating the calculation will not be triggered in this way.

snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved. The directory must already exist, otherwise an exception is thrown. If snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state. Multiple streaming engines can share a directory where the snapshot files are named as the engine names.

The file extension of a snapshot can be:

  • <engineName>.tmp: temporary snapshot
    • <engineName>.snapshot: a snapshot that is generated and flushed to disk
    • <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.

outputElapsedMicroseconds (optional) is a BOOLEAN value. The default value is false. It determines whether to output the elapsed time (in microseconds) from the time the calculation is triggered to the output of result for each window.

outputThreshold (optional) is a BOOLEAN value. It determines whether to output the threshold window values and the cumulative sums of thresholdColumn.

Examples

// data preparation
n = 1000000;
sampleDate = 2019.11.07;
symbols = `600519`000001`600000`601766;
trade = table(take(sampleDate, n) as date, 
	(09:30:00.000 + rand(7200000, n/2)).sort!() join (13:00:00.000 + rand(7200000, n/2)).sort!() as time, 
	rand(symbols, n) as symbol, 
	100+cumsum(rand(0.02, n)-0.01) as price, 
	rand(1000, n) as volume)

// create the dummyTable and outputTable	
share streamTable(10:0,`date`time`symbol`price`volumn,[DATE, TIME, SYMBOL, DOUBLE, DOUBLE]) as trades;
share table(1:0, `timestamp`symbol`open`high`low`close, [TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE]) as outputTable;

// create the threshold engine
thresholdEngine = createThresholdEngine(name="demo", threshold=1000000, metrics=<[first(price), max(price), min(price), last(price)]>, dummyTable=trades, outputTable=outputTable, thresholdColumn=`volumn, keyColumn=`symbol, timeColumn=[`date, `time]);

// insert data
thresholdEngine.append!(trade);

// query the result
select * from outputTable

Some of results are as follows:

timestamp symbol open high low close
2019.11.07 09:31:54.986 000001 99.98404977017083 100.52950904161203 99.56823161885143 100.5279260600172
2019.11.07 09:33:51.890 000001 100.51835866520182 100.92578917419537 100.49741881850642 100.70077057819347
2019.11.07 09:35:48.298 000001 100.71148486480116 100.787968895277 99.84565304366406 100.61527177638374
2019.11.07 09:37:49.058 000001 100.61627630640287 101.55438607632183 100.48305282644462 101.33690219670534
2019.11.07 09:39:44.562 000001 101.32348716112786 101.55176323080435 100.74830873960164 100.77029377058614
2019.11.07 09:41:38.118 000001 100.78262700690422 101.24037498458289 100.67253582042176 100.7765302780131