createTimeBucketEngine
Syntax
createTimeBucketEngine(name,timeCutPoints,metrics,dummyTable,outputTable,timeColumn,[keyColumn],[useWindowStartTime],[closed='left'],[fill='none'],[keyPurgeFreqInSec=-1],[outputElapsedMicroseconds=false],[parallelism=1],[outputHandler=NULL],[msgAsTable=false])
Details
Creates a time-series aggregation engine that processes data in custom time windows. The engine segments incoming data by timestamp into equal or variable-length windows, and performs incremental calculations within each window. It is typically used downstream of time-series engine (createTimeSeriesEngine) to process second or minute-level aggregation results (like OHLC bars).
Return value:A table object. Ingest data into the table for engine processing.
Windowing Logic
- Window boundaries: Defined by adjacent elements in the timeCutPoints vector.
- Boundary types: Controlled by the closed parameter. Can be left-closed, right-open or left-open, right-closed.
- Window triggering conditions:
- Left-open windows close upon receiving a record with timestamp ≥ right boundary
- Right-open windows close upon receiving a record with timestamp ≥ (right boundary - 1 ), where "1" represents one unit at the timestamp's precision level. For example, window [09:00, 09:05) closes at timestamp ≥ 09:04.
- Output timestamps: Precision matches timeColumn. If useWindowStartTime=true, uses window start time; otherwise uses window end time.
Calculation Rules
- Time range: Processes records between the first and last elements of
timeCutPoints. The time range precision matches
timeCutPoints. For example, with timeCutPoints=[09:00m,
09:05m], closed=left (left-closed-right-open):
- window includes: 09:00m ≤ timestamps ≤ 09:04m
- window excludes: timestamps > 09:04m (e.g., 09:04:00.100)
- Grouping:
- With keyColumn: Groups calculations by keyColumn
- Without keyColumn: Performs global calculations
- Result Filling: If fill is unspecified or "None", only windows with calculation results are output. If fill is specified, all windows are output, and the empty windows are filled using the specified filling method.
Arguments
name is a string indicating the name of the engine. It is the only identifier of an engine on a data or compute node. It can have letter, number and "_" and must start with a letter.
- Must contain no null values.
- The timestamp precision of timeCutPoints must be equal to or coarser than the precision of timeColumn.
- Its precision determines the exact window boundary behavior. For example, minute-precision window [09:00, 09:05) excludes data ≥ 09:04:00; second-precision window [09:00:00, 09:05:00) excludes data ≥ 09:04:59.
- It can use one or more built-in or user-defined aggregate functions (which
must be defined by the
defg
keyword) such as<[sum(volume), avg(price)]>
, or expressions of aggregate functions such as as<[avg(price1)-avg(price2)]>
, or aggregate functions involving multiple columns such as<[std(price1-price2)]>
. - You can specify functions that return multiple values for metrics,
such as
<func(price) as `col1`col2>
(it's optional to specify the column names). - If metrics is a tuple with multiple formulas, windowSize is
specified as a vector of the same length as metrics. Each element of
windowSize corresponds to the elements in metrics. For
example, if windowSize=[10,20], metrics can be
(<[min(volume), max(volume)]>, <sum(volume)>)
. metrics can also input nested tuple vectors, such as[[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]]
.
- The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
- Nested aggregate function calls are not supported in metrics.
dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.
outputTable is a table to which the engine inserts calculation results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.
The output columns are in the following order:
(1) The first column must be a time column.
(2) If keyColumn is specified, the subsequent column(s) must be in the same order as that specified by keyColumn.
(3) If outputElapsedMicroseconds is set to true, you need to specify a column of LONG type. See the outputElapsedMicroseconds parameter for details.
(4) Then followed by one or more result columns.
timeColumn is a STRING scalar or vector specifying the time column(s) of the subscribed stream table.
Note: If timeColumn is a vector, it must have a date element (of DATE type) and a time element (of TIME, SECOND or NANOTIME type). In this case, the first column in outputTable must take the data type of concatDateTime(date, time).
keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.
useWindowStartTime (optional) is a Boolean value indicating whether the time column in outputTable is the starting time of the windows. The default value is false, which means the timestamps in the output table are the end time of the windows.
- closed = 'left': left-closed, right-open
- closed = 'right': left-open, right-closed
- 'none': no result
- 'null': output a null value.
- 'ffill': output the result in the last window.
- specific value: output the specified value. Its type should be the same as metrics output's type.
keyPurgeFreqInSec (optional) is a positive integer indicating the interval (in seconds) to remove groups with no incoming data for a long time. If a group has no incoming data for at least keyPurgeFreqInSec seconds after the last time of data purging, it will be removed.
Note: To specify this parameter, parameter keyColumn must be specified and parameter fill cannot be specified.
outputElapsedMicroseconds (optional) is a BOOLEAN value. The default value is false. It determines whether to output the elapsed time (in microseconds) from the time the calculation is triggered to the output of result for each window.
parallelism (optional) is a positive integer no greater than 63, representing the number of worker threads for parallel computation. The default value is 1. For compute-intensive workloads, adjusting this parameter appropriately can effectively utilize computing resources and reduce computation time. It is recommended to set a value less than the number of CPU cores, normally from 4 to 8.
outputHandler (optional) is a unary function or a partial function with a single unfixed parameter. If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the outputHandler function. The default value is null, which means the result will be written to the output table.
msgAsTable (optional) is a Boolean scalar indicating whether the output data is passed into function (specified by outputHandler) as a table or as a tuple. If msgAsTable=true, the subscribed data is passed into function as a table. The default value is false, which means the output data is passed into function as a tuple of columns.
Examples
Use createTimeSeriesEngine
to calculate 1-minute OHLC. Then use
createTimeBucketEngine
to downsample the result to 5-minute
intervals. When the windows are set to left-closed, right-open, the time bucket
engine is faster than the time-series engine for the downsampling, because it
triggers window calculations one minute earlier.
share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades
share streamTable(10000:0, `time`sym`firstPrice`maxPrice`minPrice`lastPrice`sumVolume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as output1
timeSeries1 = createTimeSeriesEngine(name="timeSeries1", windowSize=60000, step=60000, metrics=<[first(price), max(price), min(price), last(price), sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="timeSeries1", offset=0, handler=append!{timeSeries1}, msgAsTable=true);
// define the output table and windows of time bucket engine
share streamTable(10000:0, `time`sym`firstPrice`maxPrice`minPrice`lastPrice`sumVolume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as output2
timeCutPoints=[10:00m, 10:05m, 10:10m, 10:15m]
timeBucket1 = createTimeBucketEngine(name="timeBucket1", timeCutPoints=timeCutPoints, metrics=<[first(firstPrice), max(maxPrice), min(minPrice), last(lastPrice), sum(sumVolume)]>, dummyTable=output1, outputTable=output2, timeColumn=`time, keyColumn=`sym)
subscribeTable(tableName="output1", actionName="timeBucket1", offset=0, handler=append!{timeBucket1}, msgAsTable=true);
insert into trades values(2024.10.08T10:01:01.785,`A, 10.83, 2110)
insert into trades values(2024.10.08T10:01:02.125,`B,21.73, 1600)
insert into trades values(2024.10.08T10:01:12.457,`A,10.79, 2850)
insert into trades values(2024.10.08T10:03:10.789,`A,11.81, 2250)
insert into trades values(2024.10.08T10:03:12.005,`B, 22.96, 1980)
insert into trades values(2024.10.08T10:08:02.236,`A, 11.25, 2400)
insert into trades values(2024.10.08T10:08:04.412,`B, 23.03, 2130)
insert into trades values(2024.10.08T10:08:05.152,`B, 23.18, 1900)
insert into trades values(2024.10.08T10:08:30.021,`A, 11.04, 2300)
insert into trades values(2024.10.08T10:10:20.123,`A, 11.85, 2200)
insert into trades values(2024.10.08T10:11:02.236,`A, 11.06, 2200)
insert into trades values(2024.10.08T10:13:04.412,`B, 23.15, 1880)
insert into trades values(2024.10.08T10:15:12.005,`B, 22.06, 2100)
sleep(10)
// check 1-min OHLC prices
select * from output1;
time | sym | firstPrice | maxPrice | minPrice | lastPrice | sumVolume |
---|---|---|---|---|---|---|
2024.10.08T10:02:00.000 | A | 10.83 | 10.83 | 10.79 | 10.79 | 4,960 |
2024.10.08T10:02:00.000 | B | 21.73 | 21.73 | 21.73 | 21.73 | 1,600 |
2024.10.08T10:04:00.000 | A | 11.81 | 11.81 | 11.81 | 11.81 | 2,250 |
2024.10.08T10:04:00.000 | B | 22.96 | 22.96 | 22.96 | 22.96 | 1,980 |
2024.10.08T10:09:00.000 | A | 11.25 | 11.25 | 11.04 | 11.04 | 4,700 |
2024.10.08T10:09:00.000 | B | 23.03 | 23.18 | 23.03 | 23.18 | 4,030 |
2024.10.08T10:11:00.000 | A | 11.85 | 11.85 | 11.85 | 11.85 | 2,200 |
2024.10.08T10:14:00.000 | B | 23.15 | 23.15 | 23.15 | 23.15 | 1,880 |
Check the 5-min OHLC prices:
select * from output2;
time | sym | firstPrice | maxPrice | minPrice | lastPrice | sumVolume |
---|---|---|---|---|---|---|
2024.10.08T10:05:00.000 | A | 10.83 | 11.81 | 10.79 | 11.81 | 7,210 |
2024.10.08T10:05:00.000 | B | 21.73 | 22.96 | 21.73 | 22.96 | 3,580 |
2024.10.08T10:10:00.000 | A | 11.25 | 11.25 | 11.04 | 11.04 | 4,700 |
2024.10.08T10:10:00.000 | B | 23.03 | 23.18 | 23.03 | 23.18 | 4,030 |
2024.10.08T10:15:00.000 | B | 23.15 | 23.15 | 23.15 | 23.15 | 1,880 |