Time-Series Engine
The time-series engine aggregates data within time windows. DolphinDB provides three types of time-series engines:
- Time-Series Engine: Automatically divides time windows based on specified parameters, applying aggregation rules to grouped data within each window. It is suitable for scenarios where there are no fixed trading periods, such as forex and crypto trading.
- Daily Time Series Engine: Extends the basic functionality by supporting trading sessions. It includes all unprocessed data at the start of each session in the first window of that session for aggregation. It is recommended for scenarios with fixed trading periods, such as futures and stock markets.
- Session Window Engine: Dynamically creates session windows based on event frequency. Unlike the time-series engine, window length adjusts dynamically, and windows are not fixed in duration.
The time-series engine can be created using the createTimeSeriesEngine function. The syntax is as follows:
createTimeSeriesEngine(name,
windowSize, step, metrics, dummyTable, outputTable, [timeColumn],
[useSystemTime=false], [keyColumn], [garbageSize], [updateTime],
[useWindowStartTime], [roundTime=true], [snapshotDir],
[snapshotIntervalInMsgCount], [fill='none'], [forceTriggerTime], [raftGroup],
[keyPurgeFreqInSec=-1], [closed='left'], [outputElapsedMicroseconds=false],
[subWindow], [parallelism=1], [acceptedDelay=0], [outputHandler=NULL],
[msgAsTable=false])
The daily time-series engine can be created using the createDailyTimeSeriesEngine function. The syntax is as follows:
createDailyTimeSeriesEngine(name, windowSize, step, metrics,
dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn],
[garbageSize], [updateTime], [useWindowStartTime], [roundTime=true],
[snapshotDir], [snapshotIntervalInMsgCount], [fill='none'], [sessionBegin],
[sessionEnd], [mergeSessionEnd=false], [forceTriggerTime], [raftGroup],
[forceTriggerSessionEndTime], [keyPurgeFreqInSec=-1], [closed='left'],
[outputElapsedMicroseconds=false], [subWindow], [parallelism=1],
[acceptedDelay=0], [outputHandler=NULL], [msgAsTable=false],
[keyPurgeDaily=true])
Windowing logic
Window boundaries: The engine automatically adjusts the starting point of the first window. (see parameter description for step and roundTime, and alignment rules).
Window properties:
- windowSize - the size of each window;
- closed - whether the left/right boundaries of a window is inclusive/exclusive;
- step - the duration of time between windows;
- useSystemTime specifies how values are windowed - based on the time column in the data or the system time of data ingestion.
Calculation Rules
- If timeColumn is specified, its values must be increasing. If keyColumn is specified to group the data, the values in timeColumn must be increasing with each group specified by keyColumn. Otherwise, out-of-order data will be discarded.
- If useSystemTime = true, the calculation of a window is triggered as soon as the window ends. If useSystemTime = false (with timeColumn specified), the calculation of a window is triggered by the arrival of the next record after the window ends. To trigger the calculation for the uncalculated windows, you can specify the parameter updateTime or forceTriggerTime.
- If fill is unspecified or "None", only windows with calculation results are output. If fill is specified, all windows are output, and the empty windows are filled using the specified filling method.
- Since version 2.00.11, if updateTime = 0, incoming records in the current window can be immediately calculated and output.
Alignment Rules
To facilitate observation and comparison of calculation results, the engine automatically adjusts the starting point of the first window. The alignment size (integer) will be decided by parameter step, roundTime, and the precision of timeColumn. When engine calculates within groups, all groups' windows will be uniformly aligned. The boundaries of each window are the same for each group.
- If the data type of timeColumn is MONTH, the window begins in January of the year corresponding to the first arriving record.
- If the data type of timeColumn is DATE, the boudary of the first window will not be adjusted.
- If the data type of timeColumn is MINUTE(HH:mm), the value of
alignmentSize is as follows:
if roundTime=false:
step alignmentSize 0~2 2 3 3 4~5 5 6~10 10 11~15 15 16~20 20 21~30 30 >30 60 (1 hour) if roundTime=true:
The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:
step alignmentSize 31~60 60 (1 hour) 60~120 120 (2 hours) 121~180 180 (3 hours) 181~300 300 (5 hours) 301~600 600 (10 hours) 601~900 900 (15 hours) 901~1200 1200 (20 hours) 1201~1800 1800 (30 hours) >1800 3600 (60 hours) - If the data type of timeColumn is DATETIME (yyyy-MM-dd HH:mm:ss) or
SECOND (HH:mm:ss), the value of alignmentSize is as follows:
if roundTime=false:
step alignmentSize 0~2 2 3 3 4~5 5 6~10 10 11~15 15 16~20 20 21~30 30 >30 60 (1 minute) if roundTime=true:
The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:
step alignmentSize 31~60 60 (1 minute) 61~120 120 (2 minutes) 121~180 180 (3 minutes) 181~300 300 (5 minutes) 301~600 600 (10 minutes) 601~900 900 (15 minutes) 901~1200 1200 (20 minutes) 1201~1800 1800 (30 minutes) >1800 3600 (1 hour) - If the data type of timeColumn is TIMESTAMP(yyyy-MM-dd HH:mm:ss.mmm)
or TIME(HH:mm:ss.mmm), the value of alignmentSize is as
follows:
if roundTime=false:
step alignmentSize 0~2 2 3~5 5 6~10 10 11~20 20 21~25 25 26~50 50 51~100 100 101~200 200 201~250 250 251~500 500 501~1000 1000 (1 second) 1001~2000 2000 (2 seconds) 2001~3000 3000 (3 seconds) 3001~5000 5000 (5 seconds) 5001~10000 10000 (10 seconds) 10001~15000 15000 (15 seconds) 15001~20000 20000 (20 seconds) 20001~30000 30000 (30 seconds) >30000 60000 (1 minute) if roundTime=true:
The value of alignmentSize is same as above table if step<=30000; The value of alignmentSize is as folllows if step>30000:
step alignmentSize 30001~60000 60000 (1 minute) 60001~120000 120000 (2 minutes) 120001~300000 300000 (5 minutes) 300001~600000 600000 (10 minutes) 600001~900000 900000 (15 minutes) 900001~1200000 1200000 (20 minutes) 1200001~1800000 1800000 (30 minutes) >1800000 3600000 (1 hour) - If the data type of timeColumn is NANOTIMESTAMP(yyyy-MM-dd
HH:mm:ss.nnnnnnnnn) or NANOTIME(HH:mm:ss.nnnnnnnnn), the value of
alignmentSize is as follows:
if roundTime=false:
step alignmentSize 0~2ns 2ns 3ns~5ns 5ns 6ns~10ns 10ns 11ns~20ns 20ns 21ns~25ns 25ns 26ns~50ns 50ns 51ns~100ns 100ns 101ns~200ns 200ns 201ns~250ns 250ns 251ns~500ns 500ns >500ns 1000ns if roundTime=true:
step alignmentSize 1000ns~1ms 1ms 1ms~10ms 10ms 10ms~100ms 100ms 100ms~1s 1s 1s~2s 2s 2s~3s 3s 3s~5s 5s 5s~10s 10s 10s~15s 15s 15s~20s 20s 20s~30s 30s >30s 1min
If the time of the first record is x with data type of TIMESTAMP, then the
starting time of the first window is adjusted to be
timeType_cast(x/alignmentSize*alignmentSize+step-windowSize)
,
where "/" produces only the integer part after division. For example, if the
time of the first record is 2018.10.08T01:01:01.365, windowSize = 120000,
and step = 60000, then alignmentSize = 60000, and the starting
time of the first window is
timestamp(2018.10.08T01:01:01.365/60000*60000+60000-120000)=2018.10.08T01:01:00.000.
Usage Examples
The following examples demonstrate the use of the time-series engine. Before running any example, ensure the streaming environment is reset by canceling all subscriptions and deleting existing stream tables and streaming engines.
unsubscribeTable(tableName="tableName", actionName="actionName")
undef(`tableName, SHARED)
dropStreamEngine("engineName")
Example 1. Window Boundary Alignment
The following script creates a stream table trades with two columns: time and
volume. A time-series engine streamAggr1 is then created, which calculates
sum(volume)
every 3 milliseconds over the data of the past
6 milliseconds.
share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades
outputTable = table(10000:0, `time`sumVolume, [TIMESTAMP, INT])
tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=outputTable, timeColumn=`time)
subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)
Insert 10 records into the stream table trades and check the data:
def writeData(t, n){
timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
volumev = take(1, n)
insert into t values(timev, volumev)
}
writeData(trades, 10)
select * from trades;
time | volume |
---|---|
2018.10.08T01:01:01.002 | 1 |
2018.10.08T01:01:01.003 | 1 |
2018.10.08T01:01:01.004 | 1 |
2018.10.08T01:01:01.005 | 1 |
2018.10.08T01:01:01.006 | 1 |
2018.10.08T01:01:01.007 | 1 |
2018.10.08T01:01:01.008 | 1 |
2018.10.08T01:01:01.009 | 1 |
2018.10.08T01:01:01.010 | 1 |
2018.10.08T01:01:01.011 | 1 |
Then check the result table outputTable:
select * from outputTable;
time | sumVolume |
---|---|
2018.10.08T01:01:01.003 | 1 |
2018.10.08T01:01:01.006 | 4 |
2018.10.08T01:01:01.009 | 6 |
The time-series engine aligns the start time of the first window based on the timestamp of the first received data, and moves subsequent windows by step. Below is a detailed explanation of the engine's calculation process. For simplicity, the 2018.10.08T01:01:01 prefix is omitted from the timestamps unless otherwise specified.
- Since the first record arrives at 002, the first window is aligned to
[2018.10.08T01:01:00.997, 2018.10.08T01:01:01.003), containing only the
record at 002. The calculation is triggered by the record at 003, and the
result of
sum(volume)
is 1. - The second window [000, 006) contains four records at 002, 003, 004, and 005. The calculation is triggered by the record at 006, and the result is 4.
- The third window [003 to 009) contains six records at 003, 004, 005, 006, 007, and 008. The calculation is triggered by the record at 009, and the result is 6.
- The fourth window [006 to 012) contains six records at 006, 007, 008, 009, 010, 011. Since no new data is received after this window, its calculation is not triggered.
Example 2. Setting Multiple Windows
The time-series engine supports setting multiple windows, allowing different aggregation rules for each window.
The following example demonstrates how to aggregate the same metrics over window
of different sizes. The script creates a stream table trades with two columns:
time and volume. A time-series engine streamAggr1 is then created, which
calculates sum(volume)
every 3 milliseconds over the past 6
milliseconds and the past 12 milliseconds.
share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades
outputTable = table(10000:0, `time`sumVolume1`sumVolume2, [TIMESTAMP, INT,INT])
tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=[6,12], step=3, metrics=[<sum(volume)>,<sum(volume)>], dummyTable=trades, outputTable=outputTable, timeColumn=`time)
subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)
Insert 10 records into the stream table trades and check the data:
def writeData(t, n){
timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
volumev = take(1, n)
insert into t values(timev, volumev)
}
writeData(trades, 20)
select * from trades;
Then check the result table outputTable:
select * from outputTable;
time | sumVolume1 | sumVolume2 |
---|---|---|
2018.10.08T01:01:01.003 | 1 | 1 |
2018.10.08T01:01:01.006 | 4 | 4 |
2018.10.08T01:01:01.009 | 6 | 7 |
2018.10.08T01:01:01.012 | 6 | 10 |
2018.10.08T01:01:01.015 | 6 | 12 |
2018.10.08T01:01:01.018 | 6 | 12 |
2018.10.08T01:01:01.021 | 6 | 12 |
Example 3. Metrics Expressions
The time-series engine supports flexible expressions for the parameter metrics.
- Use one or more aggregate
functions
tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<sum(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
- Calculate with aggregate
results
tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<max(ask)-min(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
- Aggregate over operation results of
columns
tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<max(ask-bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
- Output multiple aggregate
results
tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<[max((ask-bid)/(ask+bid)*2), min((ask-bid)/(ask+bid)*2)]>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
- Specify aggregate functions with multiple
arguments
tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<corr(ask,bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time) tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<percentile(ask-bid,99)/sum(ask)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
- Specify user-defined aggregate
function
defg diff(x,y){ return sum(x)-sum(y) } tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<diff(ask, bid)>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
- Use user-defined aggregate functions with multiple return
values
defg sums(x){ return [sum(x),sum2(x)] } tsAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=6, step=3, metrics=<sums(ask) as `sumAsk`sum2Ask>, dummyTable=quotes, outputTable=outputTable, timeColumn=`time)
Note: Nested aggregate functions such as sum(spread(ask,
bid))
are not supported in metrics.
Example 4. Setting Output Tables
Calculation results of streaming engines can be output to either in-memory tables or stream tables. Data in stream tables cannot be updated or deleted, but can be published as source data for other engines.
In the following example, the time-series engine electricityAggregator1 subscribes to the stream table electricity, calculates moving averages, and outputs the results to the stream table outputTable1. The time-series engine electricityAggregator2 subscribes to outputTable1 and calculates moving peaks based on the moving average results.
share streamTable(1000:0,`time`voltage`current,[TIMESTAMP,DOUBLE,DOUBLE]) as electricity
// Define the output table as a stream table for further subscription
share streamTable(10000:0,`time`avgVoltage`avgCurrent,[TIMESTAMP,DOUBLE,DOUBLE]) as outputTable1
electricityAggregator1 = createTimeSeriesEngine(name="electricityAggregator1", windowSize=10, step=10, metrics=<[avg(voltage), avg(current)]>, dummyTable=electricity, outputTable=outputTable1, timeColumn=`time, garbageSize=2000)
subscribeTable(tableName="electricity", actionName="avgElectricity", offset=0, handler=append!{electricityAggregator1}, msgAsTable=true)
// Subscribe to calculation results for further aggregation
outputTable2 =table(10000:0, `time`maxVoltage`maxCurrent, [TIMESTAMP,DOUBLE,DOUBLE])
electricityAggregator2 = createTimeSeriesEngine(name="electricityAggregator2", windowSize=100, step=100, metrics=<[max(avgVoltage), max(avgCurrent)]>, dummyTable=outputTable1, outputTable=outputTable2, timeColumn=`time, garbageSize=2000)
subscribeTable(tableName="outputTable1", actionName="maxElectricity", offset=0, handler=append!{electricityAggregator2}, msgAsTable=true);
// Insert 500 records into the electricity table
def writeData(t, n){
timev = 2018.10.08T01:01:01.000 + timestamp(1..n)
voltage = 1..n * 0.1
current = 1..n * 0.05
insert into t values(timev, voltage, current)
}
writeData(electricity, 500);
Check aggregation results:
select * from outputTable2;
time | maxVoltage | maxCurrent |
---|---|---|
2018.10.08T01:01:01.100 | 8.45 | 4.225 |
2018.10.08T01:01:01.200 | 18.45 | 9.225 |
2018.10.08T01:01:01.300 | 28.45 | 14.225 |
2018.10.08T01:01:01.400 | 38.45 | 19.225 |
2018.10.08T01:01:01.500 | 48.45 | 24.225 |
The outputTable parameter can also be specified as a streaming engine:
share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades
share streamTable(1000:0, `time`sym`open`close`high`low`volume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as kline
outputTable=table(1000:0, `sym`factor1, [SYMBOL, DOUBLE])
// Set the output table of the time-series engine as Rengine
Rengine=createReactiveStateEngine(name="reactive", metrics=<[mavg(open, 3)]>, dummyTable=kline, outputTable=outputTable, keyColumn="sym")
Tengine=createTimeSeriesEngine(name="timeseries", windowSize=6000, step=6000, metrics=<[first(price), last(price), max(price), min(price), sum(volume)]>, dummyTable=trades, outputTable=Rengine, timeColumn=`time, useSystemTime=false, keyColumn=`sym)
subscribeTable(server="", tableName="trades", actionName="timeseries", offset=0, handler=append!{Tengine}, msgAsTable=true)
Example 5. Grouped Calculations
The following example groups data based on the sym column and performs window calculations within each group.
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
outputTable = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
tradesAggregator = createTimeSeriesEngine(name="streamAggr1", windowSize=3, step=3, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=outputTable, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50)
subscribeTable(tableName="trades", actionName="append_tradesAggregator", offset=0, handler=append!{tradesAggregator}, msgAsTable=true)
def writeData(t, n){
timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
symv =take(`A`B, n)
volumev = take(1, n)
insert into t values(timev, symv, volumev)
}
writeData(trades, 6)
Check table trades (sorted by sym):
select * from trades order by sym
time | sym | volume |
---|---|---|
2018.10.08T01:01:01.002 | A | 1 |
2018.10.08T01:01:01.004 | A | 1 |
2018.10.08T01:01:01.006 | A | 1 |
2018.10.08T01:01:01.003 | B | 1 |
2018.10.08T01:01:01.005 | B | 1 |
2018.10.08T01:01:01.007 | B | 1 |
Check grouped calculation results:
select * from outputTable
time | sym | sumVolume |
2018.10.08T01:01:01.003 | A | 1 |
2018.10.08T01:01:01.006 | A | 1 |
2018.10.08T01:01:01.006 | B | 2 |
After aligning the windows for each group, the first window of each group starts at 000. With windowSize=3 and step=3, the windows for each group are divided as 000-003-006.
- At 003, group B has one record, but since group B has no data in the first window, no calculation is performed for this window.
- The data of group A at 004 triggers the calculation for the first window of group A.
- The data of group A at 006 triggers the calculation for the second window of group A.
- The data of group B at 007 triggers the calculation for the second window of group B.
When performing grouped aggregation, the timeColumn within each group must be increasing. If no grouping is performed, the timeColumn of all ingested data must be increasing.
Example 6. Window Calculation Triggers
The following two examples help illustrate how to set updateTime.
First, create a stream table and insert data:
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23);
If updateTime is not specified, calculation for a window will not occur before the window ends.
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
agg1 = createTimeSeriesEngine(name="agg1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="agg1", offset=0, handler=append!{agg1}, msgAsTable=true)
sleep(10)
select * from output1;
time | sym | sumVolume |
---|---|---|
2018.10.08T01:02:00.000 | A | 38 |
2018.10.08T01:03:00.000 | A | 25 |
2018.10.08T01:02:00.000 | B | 40 |
2018.10.08T01:03:00.000 | B | 9 |
The updateTime parameter specifies the interval at which the window is calculated. Starting from the left boundary of the window, every time a new record arrives after the specified updateTime interval, all the data before this record within the current window will be calculated. If it still has unprocessed data after 2*updateTime (at least 2 seconds), all data in this window is calculated.
output2 = keyedTable(`time`sym,10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
agg2 = createTimeSeriesEngine(name="agg2", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output2, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=1000, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="agg2", offset=0, handler=append!{agg2}, msgAsTable=true)
sleep(2010)
select * from output2;
time | sym | sumVolume |
---|---|---|
2018.10.08T01:02:00.000 | A | 38 |
2018.10.08T01:03:00.000 | A | 25 |
2018.10.08T01:02:00.000 | B | 40 |
2018.10.08T01:03:00.000 | B | 9 |
2018.10.08T01:05:00.000 | B | 55 |
2018.10.08T01:05:00.000 | A | 29 |
For simplicity, we assume that the time column indicates the time when data
enters the time-series engine, and only the time part
(hour:minute:second.millisecond)
is shown in the following
explanations. The key difference lies in how the last window
(01:04:00.000
to 01:05:00.000
) is
processed:
(1) At 01:04:04.236: 2000 milliseconds (updateTime * 2) after the first and only record of group A arrives,the calculation for group A is triggered. The output table adds a record (01:05:00.000, 'A', 29).
(2) At 01:04:05.152: A record of group B arrives after the subwindow [01:04:04.000, 01:04:05.000) containing the record at 01:04:04.412 closes, triggering a calculation for the subwindow of group B. The output table adds a record (01:05:00.000, 'B', 32).
(3) At 01:04:07.152: 2000 milliseconds after the record at 01:04:05.152 arrives, a calculation for group B is triggered, outputting a record (01:05:00.000, 'B', 55). Since the output table is a keyed table with primary key time and sym, and it already contains the record (01:05:00.000, 'B', 32), this record is updated to (01:05:00.000, 'B', 55).
Example 7. Engine Snapshots
After enabling the snapshot mechanism for the engine, if the engine encounters an
exception, its state can be promptly restored to the latest snapshot. The
following example helps explain the usage of snapshotDir and
snapshotIntervalInMsgCount. If snapshots are enabled, the
handler must use the appendMsg
function when the
engine subscribes to a stream table, and handlerNeedMsgId must be true to
record the message position.
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades
output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]);
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, snapshotDir="/home/server1/snapshotDir", snapshotIntervalInMsgCount=100)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=appendMsg{Agg1}, msgAsTable=true, handlerNeedMsgId=true)
n=500
timev=timestamp(1..n) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)
select * from output1
time | sumprice |
---|---|
2021.03.12T15:00:00.050 | 1225 |
2021.03.12T15:00:00.100 | 4950 |
2021.03.12T15:00:00.150 | 9950 |
2021.03.12T15:00:00.200 | 14950 |
2021.03.12T15:00:00.250 | 19950 |
2021.03.12T15:00:00.300 | 24950 |
2021.03.12T15:00:00.350 | 29950 |
2021.03.12T15:00:00.400 | 34950 |
2021.03.12T15:00:00.450 | 39950 |
2021.03.12T15:00:00.500 | 44950 |
getSnapshotMsgId(Agg1)
>499
Unsubscribe and delete the engine to simulate a system exception:
unsubscribeTable(, "trades", "Agg1")
dropStreamEngine("Agg1")
Agg1=NULL
Write data to the trades table:
n=500
timev=timestamp(501..1000) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)
Recreate the engine Agg1, load the snapshot, and resubscribe starting from the last processed message:
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, snapshotDir="/home/server1/snapshotDir", snapshotIntervalInMsgCount=100)
ofst=getSnapshotMsgId(Agg1)
print(ofst)
>499
subscribeTable(server="", tableName="trades", actionName="Agg1",offset=ofst+1, handler=appendMsg{Agg1}, msgAsTable=true, handlerNeedMsgId=true)
select * from output1
The results are consistent with uninterrupted subscription.
time | sumprice |
---|---|
2021.03.12T15:00:00.050 | 1225 |
2021.03.12T15:00:00.100 | 4950 |
2021.03.12T15:00:00.150 | 9950 |
2021.03.12T15:00:00.200 | 14950 |
2021.03.12T15:00:00.250 | 19950 |
2021.03.12T15:00:00.300 | 24950 |
2021.03.12T15:00:00.350 | 29950 |
2021.03.12T15:00:00.400 | 34950 |
2021.03.12T15:00:00.450 | 39950 |
2021.03.12T15:00:00.500 | 44950 |
2021.03.12T15:00:00.550 | 25450 |
2021.03.12T15:00:00.600 | 5450 |
2021.03.12T15:00:00.650 | 9950 |
2021.03.12T15:00:00.700 | 14950 |
2021.03.12T15:00:00.750 | 19950 |
2021.03.12T15:00:00.800 | 24950 |
2021.03.12T15:00:00.850 | 29950 |
2021.03.12T15:00:00.900 | 34950 |
2021.03.12T15:00:00.950 | 39950 |
2021.03.12T15:00:01.000 | 44950 |
Then we use another example to compare the situation without enabling engine snapshots. If snapshots are not enabled, even if resubscribing from the point of interruption, the results will differ from uninterrupted subscription.
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades
output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]);
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=append!{Agg1}, msgAsTable=true)
n=500
timev=timestamp(1..n) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)
n=500
timev=timestamp(501..1000) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)
select * from output1
time | sumprice |
---|---|
2021.03.12T15:00:00.050 | 1225 |
2021.03.12T15:00:00.100 | 4950 |
2021.03.12T15:00:00.150 | 9950 |
2021.03.12T15:00:00.200 | 14950 |
2021.03.12T15:00:00.250 | 19950 |
2021.03.12T15:00:00.300 | 24950 |
2021.03.12T15:00:00.350 | 29950 |
2021.03.12T15:00:00.400 | 34950 |
2021.03.12T15:00:00.450 | 39950 |
2021.03.12T15:00:00.500 | 44950 |
2021.03.12T15:00:00.550 | 25450 |
2021.03.12T15:00:00.600 | 5450 |
2021.03.12T15:00:00.650 | 9950 |
2021.03.12T15:00:00.700 | 14950 |
2021.03.12T15:00:00.750 | 19950 |
2021.03.12T15:00:00.800 | 24950 |
2021.03.12T15:00:00.850 | 29950 |
2021.03.12T15:00:00.900 | 34950 |
2021.03.12T15:00:00.950 | 39950 |
2021.03.12T15:00:01.000 | 44950 |
As snapshot is not enabled, resubscription from the last processed message will generate different results:
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,INT,INT]) as trades
output1 =table(10000:0, `time`sumprice, [TIMESTAMP,INT]);
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 0, handler=append!{Agg1}, msgAsTable=true)
n=500
timev=timestamp(1..n) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)
unsubscribeTable(, "trades", "Agg1")
dropStreamEngine("Agg1")
Agg1=NULL
n=500
timev=timestamp(501..1000) + 2021.03.12T15:00:00.000
symv = take(`abc`def, n)
pricev = int(1..n)
id = take(-1, n)
insert into trades values(timev, symv, pricev, id)
Agg1 = createTimeSeriesEngine(name=`Agg1, windowSize=100, step=50, metrics=<sum(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time)
subscribeTable(server="", tableName="trades", actionName="Agg1",offset= 500, handler=append!{Agg1}, msgAsTable=true)
select * from output1
time | sumprice |
---|---|
2021.03.12T15:00:00.050 | 1225 |
2021.03.12T15:00:00.100 | 4950 |
2021.03.12T15:00:00.150 | 9950 |
2021.03.12T15:00:00.200 | 14950 |
2021.03.12T15:00:00.250 | 19950 |
2021.03.12T15:00:00.300 | 24950 |
2021.03.12T15:00:00.350 | 29950 |
2021.03.12T15:00:00.400 | 34950 |
2021.03.12T15:00:00.450 | 39950 |
2021.03.12T15:00:00.500 | 44950 |
2021.03.12T15:00:00.550 | 1225 |
2021.03.12T15:00:00.600 | 4950 |
2021.03.12T15:00:00.650 | 9950 |
2021.03.12T15:00:00.700 | 14950 |
2021.03.12T15:00:00.750 | 19950 |
2021.03.12T15:00:00.800 | 24950 |
2021.03.12T15:00:00.850 | 29950 |
2021.03.12T15:00:00.900 | 34950 |
2021.03.12T15:00:00.950 | 39950 |
2021.03.12T15:00:01.000 | 44950 |
Example 8. Filtering Subscription Data
When using the subscribeTable
function, the handler
parameter can be used to filter the subscribed streaming data. In the following
example, the data where the voltage <= 122 or current = NULL needs to be
filtered out before entering the time-series engine.
share streamTable(1000:0, `time`voltage`current, [TIMESTAMP, DOUBLE, DOUBLE]) as electricity
outputTable = table(10000:0, `time`avgVoltage`avgCurrent, [TIMESTAMP, DOUBLE, DOUBLE])
// Define filtering function
def append_after_filtering(inputTable, msg){
t = select * from msg where voltage>122, isValid(current)
if(size(t)>0){
insert into inputTable values(t.time,t.voltage,t.current)
}
}
electricityAggregator = createTimeSeriesEngine(name="electricityAggregator", windowSize=6, step=3, metrics=<[avg(voltage), avg(current)]>, dummyTable=electricity, outputTable=outputTable, timeColumn=`time, garbageSize=2000)
subscribeTable(tableName="electricity", actionName="avgElectricity", offset=0, handler=append_after_filtering{electricityAggregator}, msgAsTable=true)
// Simulate data
def writeData(t, n){
timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
voltage = 120+1..n * 1.0
current = take([1,NULL,2]*0.1, n)
insert into t values(timev, voltage, current);
}
writeData(electricity, 10)
Check table electricity:
select * from electricity
time | voltage | current |
---|---|---|
2018.10.08T01:01:01.002 | 121 | 0.1 |
2018.10.08T01:01:01.003 | 122 | |
2018.10.08T01:01:01.004 | 123 | 0.2 |
2018.10.08T01:01:01.005 | 124 | 0.1 |
2018.10.08T01:01:01.006 | 125 | |
2018.10.08T01:01:01.007 | 126 | 0.2 |
2018.10.08T01:01:01.008 | 127 | 0.1 |
2018.10.08T01:01:01.009 | 128 | |
2018.10.08T01:01:01.010 | 129 | 0.2 |
2018.10.08T01:01:01.011 | 130 | 0.1 |
Check result table outputTable:
select * from outputTable
time | avgVoltage | avgCurrent |
---|---|---|
2018.10.08T01:01:01.006 | 123.5 | 0.15 |
2018.10.08T01:01:01.009 | 125 | 0.15 |
As the data where the voltage is less than or equal to 122 or the current is NULL has been filtered out before entering the time-series engine, no calculations are performed in the first time window [000, 003).