createThresholdEngine
Syntax
createThresholdEngine(name, threshold, metrics, dummyTable, outputTable,
thresholdColumn, [keyColumn], [timeColumn], [sessionBegin], [sessionEnd],
[keyPurgeDaily=false], [forceTriggerSessionEndTime=0], [snapshotDir],
[snapshotIntervalInMsgCount], [outputElapsedMicroseconds=false],
[outputThreshold=false])
Details
This function creates a threshold engine to implement aggregate calculations triggered by cumulative value thresholds. Each time the cumulative value of the threshold column (thresholdColumn) reaches a specific threshold (threshold*n), an aggregate calculation is triggered:
- If keyColumn is specified, the aggregate calculation will be performed independently in each group.
- If sessionBegin and sessionEnd are specified, only data in the range [sessionBegin, sessionEnd] will be included in the calculation.
Return value: A table object. Data written to this table is ingested into the engine for aggregation.
Arguments
name is a string indicating the name of the engine. It is the only identifier of an engine on a data or compute node. It can have letter, number and "_" and must start with a letter.
threshold is a positive integer indicating the threshold step size.
metrics is metacode or a tuple specifying the calculation formulas. For more information about metacode please refer to Metaprogramming.
- It can use one or more built-in or user-defined aggregate functions (which must
be defined by the
defg
keyword) such as<[sum(volume), avg(price)]>
, or expressions of aggregate functions such as as<[avg(price1)-avg(price2)]>
, or aggregate functions involving multiple columns such as<[std(price1-price2)]>
. - You can specify functions that return multiple values for metrics, such
as
<func(price) as `col1`col2>
(it's optional to specify the column names).
- The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
- Nested aggregate function calls are not supported in metrics.
dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.
outputTable is a table to which the engine inserts calculation results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.
The output columns are in the following order:
keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.
timeColumn (optional) is a STRING scalar or vector specifying the time column(s) of the subscribed stream table. When useSystemTime = false, it must be specified.
Note: If timeColumn is a vector, it must have a date element (of DATE type) and a time element (of TIME, SECOND or NANOTIME type). In this case, the first column in outputTable must take the data type of concatDateTime(date, time).
sessionBegin (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the starting time of each session. If it is a vector, it must be increasing.
sessionEnd (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the end time of each session. Specify sessionEnd as 00:00:00 to indicate the beginning of the next day (i.e., 24:00:00 of the current day).
keyPurgeDaily (optional) is a Boolean value determining if existing data groups are automatically removed when newer data of a subsequent calendar day is ingested. The default value is true. If set to false, groups of the previous calendar day are retained.
forceTriggerSessionEndTime (optional) is a non-negative integer. The unit of forceTriggerSessionEndTime is consistent with the precision of timeColumn. It indicates the waiting time to force trigger calculation in the window containing the sessionEnd, if it ends without calculation. The default value is 0, indicating the calculation will not be triggered in this way.
snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved. The directory must already exist, otherwise an exception is thrown. If snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state. Multiple streaming engines can share a directory where the snapshot files are named as the engine names.
The file extension of a snapshot can be:
- <engineName>.tmp: temporary snapshot
- <engineName>.snapshot: a snapshot that is generated and flushed to disk
- <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.
snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.
outputElapsedMicroseconds (optional) is a BOOLEAN value. The default value is false. It determines whether to output the elapsed time (in microseconds) from the time the calculation is triggered to the output of result for each window.
outputThreshold (optional) is a BOOLEAN value. It determines whether to output the threshold window values and the cumulative sums of thresholdColumn.
Examples
// data preparation
n = 1000000;
sampleDate = 2019.11.07;
symbols = `600519`000001`600000`601766;
trade = table(take(sampleDate, n) as date,
(09:30:00.000 + rand(7200000, n/2)).sort!() join (13:00:00.000 + rand(7200000, n/2)).sort!() as time,
rand(symbols, n) as symbol,
100+cumsum(rand(0.02, n)-0.01) as price,
rand(1000, n) as volume)
// create the dummyTable and outputTable
share streamTable(10:0,`date`time`symbol`price`volumn,[DATE, TIME, SYMBOL, DOUBLE, DOUBLE]) as trades;
share table(1:0, `timestamp`symbol`open`high`low`close, [TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE]) as outputTable;
// create the threshold engine
thresholdEngine = createThresholdEngine(name="demo", threshold=1000000, metrics=<[first(price), max(price), min(price), last(price)]>, dummyTable=trades, outputTable=outputTable, thresholdColumn=`volumn, keyColumn=`symbol, timeColumn=[`date, `time]);
// insert data
thresholdEngine.append!(trade);
// query the result
select * from outputTable
Some of results are as follows:
timestamp | symbol | open | high | low | close |
---|---|---|---|---|---|
2019.11.07 09:31:54.986 | 000001 | 99.98404977017083 | 100.52950904161203 | 99.56823161885143 | 100.5279260600172 |
2019.11.07 09:33:51.890 | 000001 | 100.51835866520182 | 100.92578917419537 | 100.49741881850642 | 100.70077057819347 |
2019.11.07 09:35:48.298 | 000001 | 100.71148486480116 | 100.787968895277 | 99.84565304366406 | 100.61527177638374 |
2019.11.07 09:37:49.058 | 000001 | 100.61627630640287 | 101.55438607632183 | 100.48305282644462 | 101.33690219670534 |
2019.11.07 09:39:44.562 | 000001 | 101.32348716112786 | 101.55176323080435 | 100.74830873960164 | 100.77029377058614 |
2019.11.07 09:41:38.118 | 000001 | 100.78262700690422 | 101.24037498458289 | 100.67253582042176 | 100.7765302780131 |