createTimeBucketEngine

Syntax

createTimeBucketEngine(name,timeCutPoints,metrics,dummyTable,outputTable,timeColumn,[keyColumn],[useWindowStartTime],[closed='left'],[fill='none'],[keyPurgeFreqInSec=-1],[outputElapsedMicroseconds=false],[parallelism=1],[outputHandler=NULL],[msgAsTable=false])

Details

Creates a time-series aggregation engine that processes data in custom time windows. The engine segments incoming data by timestamp into equal or variable-length windows, and performs incremental calculations within each window. It is typically used downstream of time-series engine (createTimeSeriesEngine) to process second or minute-level aggregation results (like OHLC bars).

Return value:A table object. Ingest data into the table for engine processing.

Windowing Logic

  • Window boundaries: Defined by adjacent elements in the timeCutPoints vector.
  • Boundary types: Controlled by the closed parameter. Can be left-closed, right-open or left-open, right-closed.
  • Window triggering conditions:
    • Left-open windows close upon receiving a record with timestamp ≥ right boundary
    • Right-open windows close upon receiving a record with timestamp ≥ (right boundary - 1 ), where "1" represents one unit at the timestamp's precision level. For example, window [09:00, 09:05) closes at timestamp ≥ 09:04.
  • Output timestamps: Precision matches timeColumn. If useWindowStartTime=true, uses window start time; otherwise uses window end time.

Calculation Rules

  • Time range: Processes records between the first and last elements of timeCutPoints. The time range precision matches timeCutPoints. For example, with timeCutPoints=[09:00m, 09:05m], closed=left (left-closed-right-open):
    • window includes: 09:00m ≤ timestamps ≤ 09:04m
    • window excludes: timestamps > 09:04m (e.g., 09:04:00.100)
  • Grouping:
    • With keyColumn: Groups calculations by keyColumn
    • Without keyColumn: Performs global calculations
  • Result Filling: If fill is unspecified or "None", only windows with calculation results are output. If fill is specified, all windows are output, and the empty windows are filled using the specified filling method.

Arguments

name is a string indicating the name of the engine. It is the only identifier of an engine on a data or compute node. It can have letter, number and "_" and must start with a letter.

timeCutPoints is a vector of MINUTE or SECOND type defining window boundaries. Each adjacent pair of elements forms a window. Note:
  • Must contain no null values.
  • The timestamp precision of timeCutPoints must be equal to or coarser than the precision of timeColumn.
  • Its precision determines the exact window boundary behavior. For example, minute-precision window [09:00, 09:05) excludes data ≥ 09:04:00; second-precision window [09:00:00, 09:05:00) excludes data ≥ 09:04:59.
metrics is metacode or a tuple specifying the calculation formulas. For more information about metacode please refer to Metaprogramming.
  • It can use one or more built-in or user-defined aggregate functions (which must be defined by the defg keyword) such as <[sum(volume), avg(price)]>, or expressions of aggregate functions such as as <[avg(price1)-avg(price2)]>, or aggregate functions involving multiple columns such as <[std(price1-price2)]>.
  • You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2> (it's optional to specify the column names).
  • If metrics is a tuple with multiple formulas, windowSize is specified as a vector of the same length as metrics. Each element of windowSize corresponds to the elements in metrics. For example, if windowSize=[10,20], metrics can be (<[min(volume), max(volume)]>, <sum(volume)>). metrics can also input nested tuple vectors, such as [[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]].
Note:
  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
  • Nested aggregate function calls are not supported in metrics.

dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.

outputTable is a table to which the engine inserts calculation results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.

The output columns are in the following order:

(1) The first column must be a time column.

(2) If keyColumn is specified, the subsequent column(s) must be in the same order as that specified by keyColumn.

(3) If outputElapsedMicroseconds is set to true, you need to specify a column of LONG type. See the outputElapsedMicroseconds parameter for details.

(4) Then followed by one or more result columns.

timeColumn is a STRING scalar or vector specifying the time column(s) of the subscribed stream table.

Note: If timeColumn is a vector, it must have a date element (of DATE type) and a time element (of TIME, SECOND or NANOTIME type). In this case, the first column in outputTable must take the data type of concatDateTime(date, time).

keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.

useWindowStartTime (optional) is a Boolean value indicating whether the time column in outputTable is the starting time of the windows. The default value is false, which means the timestamps in the output table are the end time of the windows.

closed (optional) is a STRING indicating whether the left or the right boundary is included.
  • closed = 'left': left-closed, right-open
  • closed = 'right': left-open, right-closed
fill (optional) is a vector/scalar indicating the filling method to deal with an empty window (in a group). It can be:
  • 'none': no result
  • 'null': output a null value.
  • 'ffill': output the result in the last window.
  • specific value: output the specified value. Its type should be the same as metrics output's type.
fill could be a vector to specify different filling method for each metric. The size of the vector must be consistent with the number of elements specified in metrics. The element in vector cannot be 'none'.

keyPurgeFreqInSec (optional) is a positive integer indicating the interval (in seconds) to remove groups with no incoming data for a long time. If a group has no incoming data for at least keyPurgeFreqInSec seconds after the last time of data purging, it will be removed.

Note: To specify this parameter, parameter keyColumn must be specified and parameter fill cannot be specified.

outputElapsedMicroseconds (optional) is a BOOLEAN value. The default value is false. It determines whether to output the elapsed time (in microseconds) from the time the calculation is triggered to the output of result for each window.

parallelism (optional) is a positive integer no greater than 63, representing the number of worker threads for parallel computation. The default value is 1. For compute-intensive workloads, adjusting this parameter appropriately can effectively utilize computing resources and reduce computation time. It is recommended to set a value less than the number of CPU cores, normally from 4 to 8.

outputHandler (optional) is a unary function or a partial function with a single unfixed parameter. If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the outputHandler function. The default value is null, which means the result will be written to the output table.

msgAsTable (optional) is a Boolean scalar indicating whether the output data is passed into function (specified by outputHandler) as a table or as a tuple. If msgAsTable=true, the subscribed data is passed into function as a table. The default value is false, which means the output data is passed into function as a tuple of columns.

Examples

Use createTimeSeriesEngine to calculate 1-minute OHLC. Then use createTimeBucketEngine to downsample the result to 5-minute intervals. When the windows are set to left-closed, right-open, the time bucket engine is faster than the time-series engine for the downsampling, because it triggers window calculations one minute earlier.

share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades
share streamTable(10000:0, `time`sym`firstPrice`maxPrice`minPrice`lastPrice`sumVolume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as output1
timeSeries1 = createTimeSeriesEngine(name="timeSeries1", windowSize=60000, step=60000, metrics=<[first(price), max(price), min(price), last(price), sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="timeSeries1", offset=0, handler=append!{timeSeries1}, msgAsTable=true);

// define the output table and windows of time bucket engine
share streamTable(10000:0, `time`sym`firstPrice`maxPrice`minPrice`lastPrice`sumVolume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as output2
timeCutPoints=[10:00m, 10:05m, 10:10m, 10:15m]

timeBucket1 = createTimeBucketEngine(name="timeBucket1", timeCutPoints=timeCutPoints, metrics=<[first(firstPrice), max(maxPrice), min(minPrice), last(lastPrice), sum(sumVolume)]>, dummyTable=output1, outputTable=output2, timeColumn=`time,  keyColumn=`sym)
subscribeTable(tableName="output1", actionName="timeBucket1", offset=0, handler=append!{timeBucket1}, msgAsTable=true);


insert into trades values(2024.10.08T10:01:01.785,`A, 10.83, 2110)
insert into trades values(2024.10.08T10:01:02.125,`B,21.73, 1600)
insert into trades values(2024.10.08T10:01:12.457,`A,10.79, 2850)
insert into trades values(2024.10.08T10:03:10.789,`A,11.81, 2250)
insert into trades values(2024.10.08T10:03:12.005,`B, 22.96, 1980)
insert into trades values(2024.10.08T10:08:02.236,`A, 11.25, 2400)
insert into trades values(2024.10.08T10:08:04.412,`B, 23.03, 2130)
insert into trades values(2024.10.08T10:08:05.152,`B, 23.18, 1900)
insert into trades values(2024.10.08T10:08:30.021,`A, 11.04, 2300)
insert into trades values(2024.10.08T10:10:20.123,`A, 11.85, 2200)
insert into trades values(2024.10.08T10:11:02.236,`A, 11.06, 2200)
insert into trades values(2024.10.08T10:13:04.412,`B, 23.15, 1880)
insert into trades values(2024.10.08T10:15:12.005,`B, 22.06, 2100)

sleep(10)

// check 1-min OHLC prices
select * from output1;
time sym firstPrice maxPrice minPrice lastPrice sumVolume
2024.10.08T10:02:00.000 A 10.83 10.83 10.79 10.79 4,960
2024.10.08T10:02:00.000 B 21.73 21.73 21.73 21.73 1,600
2024.10.08T10:04:00.000 A 11.81 11.81 11.81 11.81 2,250
2024.10.08T10:04:00.000 B 22.96 22.96 22.96 22.96 1,980
2024.10.08T10:09:00.000 A 11.25 11.25 11.04 11.04 4,700
2024.10.08T10:09:00.000 B 23.03 23.18 23.03 23.18 4,030
2024.10.08T10:11:00.000 A 11.85 11.85 11.85 11.85 2,200
2024.10.08T10:14:00.000 B 23.15 23.15 23.15 23.15 1,880

Check the 5-min OHLC prices:

select * from output2;
time sym firstPrice maxPrice minPrice lastPrice sumVolume
2024.10.08T10:05:00.000 A 10.83 11.81 10.79 11.81 7,210
2024.10.08T10:05:00.000 B 21.73 22.96 21.73 22.96 3,580
2024.10.08T10:10:00.000 A 11.25 11.25 11.04 11.04 4,700
2024.10.08T10:10:00.000 B 23.03 23.18 23.03 23.18 4,030
2024.10.08T10:15:00.000 B 23.15 23.15 23.15 23.15 1,880