Time-Series Engine

The time-series engine aggregates data within time windows. DolphinDB provides three types of time-series engines:

  • Time-Series Engine: Automatically divides time windows based on specified parameters, applying aggregation rules to grouped data within each window. It is suitable for scenarios where there are no fixed trading periods, such as forex and crypto trading.
  • Daily Time Series Engine: Extends the basic functionality by supporting trading sessions. It includes all unprocessed data at the start of each session in the first window of that session for aggregation. It is recommended for scenarios with fixed trading periods, such as futures and stock markets.
  • Session Window Engine: Dynamically creates session windows based on event frequency. Unlike the time-series engine, window length adjusts dynamically, and windows are not fixed in duration.

The time-series engine can be created using the createTimeSeriesEngine function. The syntax is as follows:

createTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill='none'], [forceTriggerTime], [raftGroup], [keyPurgeFreqInSec=-1], [closed='left'], [outputElapsedMicroseconds=false], [subWindow], [parallelism=1], [acceptedDelay=0], [outputHandler=NULL], [msgAsTable=false])

The daily time-series engine can be created using the createDailyTimeSeriesEngine function. The syntax is as follows:

createDailyTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill='none'], [sessionBegin], [sessionEnd], [mergeSessionEnd=false], [forceTriggerTime], [raftGroup], [forceTriggerSessionEndTime], [keyPurgeFreqInSec=-1], [closed='left'], [outputElapsedMicroseconds=false], [subWindow], [parallelism=1], [acceptedDelay=0], [outputHandler=NULL], [msgAsTable=false], [keyPurgeDaily=true])

Windowing logic

Window boundaries: The engine automatically adjusts the starting point of the first window. (see parameter description for step and roundTime, and alignment rules).

Window properties:

  • windowSize - the size of each window;
  • closed - whether the left/right boundaries of a window is inclusive/exclusive;
  • step - the duration of time between windows;
  • useSystemTime specifies how values are windowed - based on the time column in the data or the system time of data ingestion.

Calculation Rules

  • If timeColumn is specified, its values must be increasing. If keyColumn is specified to group the data, the values in timeColumn must be increasing with each group specified by keyColumn. Otherwise, out-of-order data will be discarded.
  • If useSystemTime = true, the calculation of a window is triggered as soon as the window ends. If useSystemTime = false (with timeColumn specified), the calculation of a window is triggered by the arrival of the next record after the window ends. To trigger the calculation for the uncalculated windows, you can specify the parameter updateTime or forceTriggerTime.
  • If fill is unspecified or "None", only windows with calculation results are output. If fill is specified, all windows are output, and the empty windows are filled using the specified filling method.
  • Since version 2.00.11, if updateTime = 0, incoming records in the current window can be immediately calculated and output.

Alignment Rules

To facilitate observation and comparison of calculation results, the engine automatically adjusts the starting point of the first window. The alignment size (integer) will be decided by parameter step, roundTime, and the precision of timeColumn. When engine calculates within groups, all groups' windows will be uniformly aligned. The boundaries of each window are the same for each group.

  • If the data type of timeColumn is MONTH, the window begins in January of the year corresponding to the first arriving record.
  • If the data type of timeColumn is DATE, the boudary of the first window will not be adjusted.
  • If the data type of timeColumn is MINUTE(HH:mm), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2 2
    3 3
    4~5 5
    6~10 10
    11~15 15
    16~20 20
    21~30 30
    >30 60 (1 hour)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:

    step alignmentSize
    31~60 60 (1 hour)
    60~120 120 (2 hours)
    121~180 180 (3 hours)
    181~300 300 (5 hours)
    301~600 600 (10 hours)
    601~900 900 (15 hours)
    901~1200 1200 (20 hours)
    1201~1800 1800 (30 hours)
    >1800 3600 (60 hours)
  • If the data type of timeColumn is DATETIME (yyyy-MM-dd HH:mm:ss) or SECOND (HH:mm:ss), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2 2
    3 3
    4~5 5
    6~10 10
    11~15 15
    16~20 20
    21~30 30
    >30 60 (1 minute)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:

    step alignmentSize
    31~60 60 (1 minute)
    61~120 120 (2 minutes)
    121~180 180 (3 minutes)
    181~300 300 (5 minutes)
    301~600 600 (10 minutes)
    601~900 900 (15 minutes)
    901~1200 1200 (20 minutes)
    1201~1800 1800 (30 minutes)
    >1800 3600 (1 hour)
  • If the data type of timeColumn is TIMESTAMP(yyyy-MM-dd HH:mm:ss.mmm) or TIME(HH:mm:ss.mmm), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2 2
    3~5 5
    6~10 10
    11~20 20
    21~25 25
    26~50 50
    51~100 100
    101~200 200
    201~250 250
    251~500 500
    501~1000 1000 (1 second)
    1001~2000 2000 (2 seconds)
    2001~3000 3000 (3 seconds)
    3001~5000 5000 (5 seconds)
    5001~10000 10000 (10 seconds)
    10001~15000 15000 (15 seconds)
    15001~20000 20000 (20 seconds)
    20001~30000 30000 (30 seconds)
    >30000 60000 (1 minute)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30000; The value of alignmentSize is as folllows if step>30000:

    step alignmentSize
    30001~60000 60000 (1 minute)
    60001~120000 120000 (2 minutes)
    120001~300000 300000 (5 minutes)
    300001~600000 600000 (10 minutes)
    600001~900000 900000 (15 minutes)
    900001~1200000 1200000 (20 minutes)
    1200001~1800000 1800000 (30 minutes)
    >1800000 3600000 (1 hour)
  • If the data type of timeColumn is NANOTIMESTAMP(yyyy-MM-dd HH:mm:ss.nnnnnnnnn) or NANOTIME(HH:mm:ss.nnnnnnnnn), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2ns 2ns
    3ns~5ns 5ns
    6ns~10ns 10ns
    11ns~20ns 20ns
    21ns~25ns 25ns
    26ns~50ns 50ns
    51ns~100ns 100ns
    101ns~200ns 200ns
    201ns~250ns 250ns
    251ns~500ns 500ns
    >500ns 1000ns

    if roundTime=true:

    step alignmentSize
    1000ns~1ms 1ms
    1ms~10ms 10ms
    10ms~100ms 100ms
    100ms~1s 1s
    1s~2s 2s
    2s~3s 3s
    3s~5s 5s
    5s~10s 10s
    10s~15s 15s
    15s~20s 20s
    20s~30s 30s
    >30s 1min

If the time of the first record is x with data type of TIMESTAMP, then the starting time of the first window is adjusted to be timeType_cast(x/alignmentSize*alignmentSize+step-windowSize), where "/" produces only the integer part after division. For example, if the time of the first record is 2018.10.08T01:01:01.365, windowSize = 120000, and step = 60000, then alignmentSize = 60000, and the starting time of the first window is timestamp(2018.10.08T01:01:01.365/60000*60000+60000-120000)=2018.10.08T01:01:00.000.

Usage Examples

The following examples demonstrate the use of the time-series engine. Before running any example, ensure the streaming environment is reset by canceling all subscriptions and deleting existing stream tables and streaming engines.

unsubscribeTable(tableName="tableName", actionName="actionName")
undef(`tableName, SHARED)
dropStreamEngine("engineName")

Example 1. Window Boundary Alignment

The following script creates a stream table trades with two columns: time and volume. A time-series engine streamAggr1 is then created, which calculates sum(volume) every 3 milliseconds over the data of the past 6 milliseconds.

share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades
outputTable = table(10000:0, `time`sumVolume, [TIMESTAMP, INT])
tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=outputTable, timeColumn=`time)
subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)    

Insert 10 records into the stream table trades and check the data:

def writeData(t, n){
    timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    volumev = take(1, n)
    insert into t values(timev, volumev)
}
writeData(trades, 10)

select * from trades;
time volume
2018.10.08T01:01:01.002 1
2018.10.08T01:01:01.003 1
2018.10.08T01:01:01.004 1
2018.10.08T01:01:01.005 1
2018.10.08T01:01:01.006 1
2018.10.08T01:01:01.007 1
2018.10.08T01:01:01.008 1
2018.10.08T01:01:01.009 1
2018.10.08T01:01:01.010 1
2018.10.08T01:01:01.011 1

Then check the result table outputTable:

select * from outputTable;
time sumVolume
2018.10.08T01:01:01.003 1
2018.10.08T01:01:01.006 4
2018.10.08T01:01:01.009 6

The time-series engine aligns the start time of the first window based on the timestamp of the first received data, and moves subsequent windows by step. Below is a detailed explanation of the engine's calculation process. For simplicity, the 2018.10.08T01:01:01 prefix is omitted from the timestamps unless otherwise specified.

  • Since the first record arrives at 002, the first window is aligned to [2018.10.08T01:01:00.997, 2018.10.08T01:01:01.003), containing only the record at 002. The calculation is triggered by the record at 003, and the result of sum(volume) is 1.
  • The second window [000, 006) contains four records at 002, 003, 004, and 005. The calculation is triggered by the record at 006, and the result is 4.
  • The third window [003 to 009) contains six records at 003, 004, 005, 006, 007, and 008. The calculation is triggered by the record at 009, and the result is 6.
  • The fourth window [006 to 012) contains six records at 006, 007, 008, 009, 010, 011. Since no new data is received after this window, its calculation is not triggered.

Example 2. Setting Multiple Windows

The time-series engine supports setting multiple windows, allowing different aggregation rules for each window.

The following example demonstrates how to aggregate the same metrics over window of different sizes. The script creates a stream table trades with two columns: time and volume. A time-series engine streamAggr1 is then created, which calculates sum(volume) every 3 milliseconds over the past 6 milliseconds and the past 12 milliseconds.

share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades
outputTable = table(10000:0, `time`sumVolume1`sumVolume2, [TIMESTAMP, INT,INT])
tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=[6,12], step=3, metrics=[<sum(volume)>,<sum(volume)>], dummyTable=trades, outputTable=outputTable, timeColumn=`time)
subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)    

Insert 10 records into the stream table trades and check the data:

def writeData(t, n){
    timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    volumev = take(1, n)
    insert into t values(timev, volumev)
}
writeData(trades, 20)

select * from trades;

Then check the result table outputTable:

select * from outputTable;
time sumVolume1 sumVolume2
2018.10.08T01:01:01.003 1 1
2018.10.08T01:01:01.006 4 4
2018.10.08T01:01:01.009 6 7
2018.10.08T01:01:01.012 6 10
2018.10.08T01:01:01.015 6 12
2018.10.08T01:01:01.018 6 12
2018.10.08T01:01:01.021 6 12

Example 3. Metrics Expressions

The time-series engine supports flexible expressions for the parameter metrics.

  • Use one or more aggregate functions
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<sum(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • Calculate with aggregate results
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<max(ask)-min(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • Aggregate over operation results of columns
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<max(ask-bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • Output multiple aggregate results
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<[max((ask-bid)/(ask+bid)*2), min((ask-bid)/(ask+bid)*2)]>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • Specify aggregate functions with multiple arguments
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<corr(ask,bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
    
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<percentile(ask-bid,99)/sum(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • Specify user-defined aggregate function
    defg diff(x,y){
    	return sum(x)-sum(y)
    }
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<diff(ask, bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
  • Use user-defined aggregate functions with multiple return values
    defg sums(x){
    	return [sum(x),sum2(x)]
    }
    tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<sums(ask) as `sumAsk`sum2Ask>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)

Note: Nested aggregate functions such as sum(spread(ask, bid)) are not supported in metrics.

Example 4. Setting Output Tables

Calculation results of streaming engines can be output to either in-memory tables or stream tables. Data in stream tables cannot be updated or deleted, but can be published as source data for other engines.

In the following example, the time-series engine electricityAggregator1 subscribes to the stream table electricity, calculates moving averages, and outputs the results to the stream table outputTable1. The time-series engine electricityAggregator2 subscribes to outputTable1 and calculates moving peaks based on the moving average results.

share streamTable(1000:0,`time`voltage`current,[TIMESTAMP,DOUBLE,DOUBLE]) as electricity

// Define the output table as a stream table for further subscription
share streamTable(10000:0,`time`avgVoltage`avgCurrent,[TIMESTAMP,DOUBLE,DOUBLE]) as outputTable1 

electricityAggregator1 = createTimeSeriesEngine(name="electricityAggregator1", windowSize=10, step=10, metrics=<[avg(voltage), avg(current)]>, dummyTable=electricity, outputTable=outputTable1, timeColumn=`time, garbageSize=2000)
subscribeTable(tableName="electricity", actionName="avgElectricity", offset=0, handler=append!{electricityAggregator1}, msgAsTable=true)

// Subscribe to calculation results for further aggregation
outputTable2 =table(10000:0, `time`maxVoltage`maxCurrent, [TIMESTAMP,DOUBLE,DOUBLE])
electricityAggregator2 = createTimeSeriesEngine(name="electricityAggregator2", windowSize=100, step=100, metrics=<[max(avgVoltage), max(avgCurrent)]>, dummyTable=outputTable1, outputTable=outputTable2, timeColumn=`time, garbageSize=2000)
subscribeTable(tableName="outputTable1", actionName="maxElectricity", offset=0, handler=append!{electricityAggregator2}, msgAsTable=true);

// Insert 500 records into the electricity table
def writeData(t, n){
        timev = 2018.10.08T01:01:01.000 + timestamp(1..n)
        voltage = 1..n * 0.1
        current = 1..n * 0.05
        insert into t values(timev, voltage, current)
}
writeData(electricity, 500);

Check aggregation results:

select * from outputTable2;
time maxVoltage maxCurrent
2018.10.08T01:01:01.100 8.45 4.225
2018.10.08T01:01:01.200 18.45 9.225
2018.10.08T01:01:01.300 28.45 14.225
2018.10.08T01:01:01.400 38.45 19.225
2018.10.08T01:01:01.500 48.45 24.225

The outputTable parameter can also be specified as a streaming engine:

share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades
share streamTable(1000:0, `time`sym`open`close`high`low`volume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as kline
outputTable=table(1000:0, `sym`factor1, [SYMBOL, DOUBLE])
// Set the output table of the time-series engine as Rengine
Rengine=createReactiveStateEngine(name="reactive", metrics=<[mavg(open, 3)]>, dummyTable=kline, outputTable=outputTable, keyColumn="sym")
Tengine=createTimeSeriesEngine(name="timeseries", windowSize=6000, step=6000, metrics=<[first(price), last(price), max(price), min(price), sum(volume)]>, dummyTable=trades, outputTable=Rengine, timeColumn=`time, useSystemTime=false, keyColumn=`sym)
subscribeTable(server="", tableName="trades", actionName="timeseries", offset=0, handler=append!{Tengine}, msgAsTable=true)   

Example 5. Grouped Calculations

The following example groups data based on the sym column and performs window calculations within each group.

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
outputTable = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=3, step=3, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=outputTable, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50)
subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)    

def writeData(t, n){
    timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    symv =take(`A`B, n)
    volumev = take(1, n)
    insert into t values(timev, symv, volumev)
}

writeData(trades, 6)

Check table trades (sorted by sym):

select * from trades order by sym
time sym volume
2018.10.08T01:01:01.002 A 1
2018.10.08T01:01:01.004 A 1
2018.10.08T01:01:01.006 A 1
2018.10.08T01:01:01.003 B 1
2018.10.08T01:01:01.005 B 1
2018.10.08T01:01:01.007 B 1

Check grouped calculation results:

select * from outputTable
time sym sumVolume
2018.10.08T01:01:01.003 A 1
2018.10.08T01:01:01.006 A 1
2018.10.08T01:01:01.006 B 2

After aligning the windows for each group, the first window of each group starts at 000. With windowSize=3 and step=3, the windows for each group are divided as 000-003-006.

  • At 003, group B has one record, but since group B has no data in the first window, no calculation is performed for this window.
  • The data of group A at 004 triggers the calculation for the first window of group A.
  • The data of group A at 006 triggers the calculation for the second window of group A.
  • The data of group B at 007 triggers the calculation for the second window of group B.

When performing grouped aggregation, the timeColumn within each group must be increasing. If no grouping is performed, the timeColumn of all ingested data must be increasing.

Example 6. Window Calculation Triggers

The following two examples help illustrate how to set updateTime.

First, create a stream table and insert data:

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23);

If updateTime is not specified, calculation for a window will not occur before the window ends.

output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
agg1 = createTimeSeriesEngine(name="agg1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="agg1", offset=0, handler=append!{agg1}, msgAsTable=true)

sleep(10)

select * from output1;
time sym sumVolume
2018.10.08T01:02:00.000 A 38
2018.10.08T01:03:00.000 A 25
2018.10.08T01:02:00.000 B 40
2018.10.08T01:03:00.000 B 9

The updateTime parameter specifies the interval at which the window is calculated. Starting from the left boundary of the window, every time a new record arrives after the specified updateTime interval, all the data before this record within the current window will be calculated. If it still has unprocessed data after 2*updateTime (at least 2 seconds), all data in this window is calculated.

output2 = keyedTable(`time`sym,10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
agg2 = createTimeSeriesEngine(name="agg2", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output2, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=1000, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="agg2", offset=0, handler=append!{agg2}, msgAsTable=true)

sleep(2010)

select * from output2;
time sym sumVolume
2018.10.08T01:02:00.000 A 38
2018.10.08T01:03:00.000 A 25
2018.10.08T01:02:00.000 B 40
2018.10.08T01:03:00.000 B 9
2018.10.08T01:05:00.000 B 55
2018.10.08T01:05:00.000 A 29

For simplicity, we assume that the time column indicates the time when data enters the time-series engine, and only the time part (hour:minute:second.millisecond) is shown in the following explanations. The key difference lies in how the last window (01:04:00.000 to 01:05:00.000) is processed:

(1) At 01:04:04.236: 2000 milliseconds (updateTime * 2) after the first and only record of group A arrives,the calculation for group A is triggered. The output table adds a record (01:05:00.000, 'A', 29).

(2) At 01:04:05.152: A record of group B arrives after the subwindow [01:04:04.000, 01:04:05.000) containing the record at 01:04:04.412 closes, triggering a calculation for the subwindow of group B. The output table adds a record (01:05:00.000, 'B', 32).

(3) At 01:04:07.152: 2000 milliseconds after the record at 01:04:05.152 arrives, a calculation for group B is triggered, outputting a record (01:05:00.000, 'B', 55). Since the output table is a keyed table with primary key time and sym, and it already contains the record (01:05:00.000, 'B', 32), this record is updated to (01:05:00.000, 'B', 55).

Example 7. Engine Snapshots

After enabling the snapshot mechanism for the engine, if the engine encounters an exception, its state can be promptly restored to the latest snapshot. The following example helps explain the usage of snapshotDir and snapshotIntervalInMsgCount. If snapshots are enabled, the handler must use the appendMsg function when the engine subscribes to a stream table, and handlerNeedMsgId must be true to record the message position.

share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades
output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]);
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, snapshotDir="/home/server1/snapshotDir", snapshotIntervalInMsgCount=100)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=appendMsg{Agg1}, msgAsTable=true, handlerNeedMsgId=true)

n=500
timev=timestamp(1..n) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

select * from output1
time sumprice
2021.03.12T15:00:00.050 1225
2021.03.12T15:00:00.100 4950
2021.03.12T15:00:00.150 9950
2021.03.12T15:00:00.200 14950
2021.03.12T15:00:00.250 19950
2021.03.12T15:00:00.300 24950
2021.03.12T15:00:00.350 29950
2021.03.12T15:00:00.400 34950
2021.03.12T15:00:00.450 39950
2021.03.12T15:00:00.500 44950
getSnapshotMsgId(Agg1)
>499

Unsubscribe and delete the engine to simulate a system exception:

unsubscribeTable(, "trades", "Agg1")
dropStreamEngine("Agg1")
Agg1=NULL

Write data to the trades table:

n=500
timev=timestamp(501..1000) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

Recreate the engine Agg1, load the snapshot, and resubscribe starting from the last processed message:

Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, snapshotDir="/home/server1/snapshotDir", snapshotIntervalInMsgCount=100)

ofst=getSnapshotMsgId(Agg1)
print(ofst)
>499

subscribeTable(server="", tableName="trades", actionName="Agg1",offset=ofst+1, handler=appendMsg{Agg1}, msgAsTable=true, handlerNeedMsgId=true)

select * from output1

The results are consistent with uninterrupted subscription.

time sumprice
2021.03.12T15:00:00.050 1225
2021.03.12T15:00:00.100 4950
2021.03.12T15:00:00.150 9950
2021.03.12T15:00:00.200 14950
2021.03.12T15:00:00.250 19950
2021.03.12T15:00:00.300 24950
2021.03.12T15:00:00.350 29950
2021.03.12T15:00:00.400 34950
2021.03.12T15:00:00.450 39950
2021.03.12T15:00:00.500 44950
2021.03.12T15:00:00.550 25450
2021.03.12T15:00:00.600 5450
2021.03.12T15:00:00.650 9950
2021.03.12T15:00:00.700 14950
2021.03.12T15:00:00.750 19950
2021.03.12T15:00:00.800 24950
2021.03.12T15:00:00.850 29950
2021.03.12T15:00:00.900 34950
2021.03.12T15:00:00.950 39950
2021.03.12T15:00:01.000 44950

Then we use another example to compare the situation without enabling engine snapshots. If snapshots are not enabled, even if resubscribing from the point of interruption, the results will differ from uninterrupted subscription.

share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades
output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]);
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=append!{Agg1}, msgAsTable=true)

n=500
timev=timestamp(1..n) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

n=500
timev=timestamp(501..1000) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

select * from output1
time sumprice
2021.03.12T15:00:00.050 1225
2021.03.12T15:00:00.100 4950
2021.03.12T15:00:00.150 9950
2021.03.12T15:00:00.200 14950
2021.03.12T15:00:00.250 19950
2021.03.12T15:00:00.300 24950
2021.03.12T15:00:00.350 29950
2021.03.12T15:00:00.400 34950
2021.03.12T15:00:00.450 39950
2021.03.12T15:00:00.500 44950
2021.03.12T15:00:00.550 25450
2021.03.12T15:00:00.600 5450
2021.03.12T15:00:00.650 9950
2021.03.12T15:00:00.700 14950
2021.03.12T15:00:00.750 19950
2021.03.12T15:00:00.800 24950
2021.03.12T15:00:00.850 29950
2021.03.12T15:00:00.900 34950
2021.03.12T15:00:00.950 39950
2021.03.12T15:00:01.000 44950

As snapshot is not enabled, resubscription from the last processed message will generate different results:

share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades
output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]);
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=append!{Agg1}, msgAsTable=true)

n=500
timev=timestamp(1..n) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

unsubscribeTable(, "trades", "Agg1")
dropStreamEngine("Agg1")
Agg1=NULL

n=500
timev=timestamp(501..1000) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)

Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 500, handler=append!{Agg1}, msgAsTable=true)

select * from output1
time sumprice
2021.03.12T15:00:00.050 1225
2021.03.12T15:00:00.100 4950
2021.03.12T15:00:00.150 9950
2021.03.12T15:00:00.200 14950
2021.03.12T15:00:00.250 19950
2021.03.12T15:00:00.300 24950
2021.03.12T15:00:00.350 29950
2021.03.12T15:00:00.400 34950
2021.03.12T15:00:00.450 39950
2021.03.12T15:00:00.500 44950
2021.03.12T15:00:00.550 1225
2021.03.12T15:00:00.600 4950
2021.03.12T15:00:00.650 9950
2021.03.12T15:00:00.700 14950
2021.03.12T15:00:00.750 19950
2021.03.12T15:00:00.800 24950
2021.03.12T15:00:00.850 29950
2021.03.12T15:00:00.900 34950
2021.03.12T15:00:00.950 39950
2021.03.12T15:00:01.000 44950

Example 8. Filtering Subscription Data

When using the subscribeTable function, the handler parameter can be used to filter the subscribed streaming data. In the following example, the data where the voltage <= 122 or current = NULL needs to be filtered out before entering the time-series engine.

share streamTable(1000:0, `time`voltage`current, [TIMESTAMP, DOUBLE, DOUBLE]) as electricity
outputTable = table(10000:0, `time`avgVoltage`avgCurrent, [TIMESTAMP, DOUBLE, DOUBLE])

// Define filtering function
def append_after_filtering(inputTable, msg){
	t = select * from msg where voltage>122, isValid(current)
	if(size(t)>0){
		insert into inputTable values(t.time,t.voltage,t.current)		
	}
}
electricityAggregator = createTimeSeriesEngine(name="electricityAggregator", windowSize=6, step=3, metrics=<[avg(voltage), avg(current)]>, dummyTable=electricity, outputTable=outputTable, timeColumn=`time, garbageSize=2000)
subscribeTable(tableName="electricity", actionName="avgElectricity", offset=0, handler=append_after_filtering{electricityAggregator}, msgAsTable=true)

// Simulate data
def writeData(t, n){
        timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
        voltage = 120+1..n * 1.0
        current = take([1,NULL,2]*0.1, n)
        insert into t values(timev, voltage, current);
}
writeData(electricity, 10)

Check table electricity:

select * from electricity
time voltage current
2018.10.08T01:01:01.002 121 0.1
2018.10.08T01:01:01.003 122
2018.10.08T01:01:01.004 123 0.2
2018.10.08T01:01:01.005 124 0.1
2018.10.08T01:01:01.006 125
2018.10.08T01:01:01.007 126 0.2
2018.10.08T01:01:01.008 127 0.1
2018.10.08T01:01:01.009 128
2018.10.08T01:01:01.010 129 0.2
2018.10.08T01:01:01.011 130 0.1

Check result table outputTable:

select * from outputTable
time avgVoltage avgCurrent
2018.10.08T01:01:01.006 123.5 0.15
2018.10.08T01:01:01.009 125 0.15

As the data where the voltage is less than or equal to 122 or the current is NULL has been filtered out before entering the time-series engine, no calculations are performed in the first time window [000, 003).