Session Window Engine
Session windows can be considered as activity sessions (where data is generated). Before and after a session, there are gaps of inactivity (where no data are generated).
Taking the IoT scenario as an example: depending on whether a device is online or not, in certain time periods a large amount of data could be generated while in others there could be none. Applying sliding window calculation on such data may cause unnecessary computational overhead as a lot of empty windows will be generated. The session window engine is designed to solve such problems.
Calculation Rules
The session window engine can be created using the
createSessionWindowEngine
function. The syntax is as
follows:
createSessionWindowEngine(name,
sessionGap, metrics, dummyTable, outputTable, [timeColumn],
[useSystemTime=false], [keyColumn], [updateTime], [useSessionStartTime=true],
[snapshotDir], [snapshotIntervalInMsgCount], [raftGroup],
[forceTriggerTime])
For detailed information, refer to createSessionWindowEngine.
The session window engine has the same calculation rules and triggering patterns as
the time-series engine. The difference is that the windows of the time-series engine
are generated at fixed frequencies with a fixed size whereas the windows of the
session window engine are not. Most of its parameters are the same as those of
createTimeSeriesEngine
, with the only differences being the
sessionGap and useSessionStartTime parameters. sessionGap
defines how long to wait before closing a session window, and
useSessionStartTime determines window's start or end time in the output
table (specified by sessionGap).
When records are ingested into a session window, the window remains open until a specified period of inactivity passes (specified by sessionGap). The window end is equal to the timestamp of the last received record + the sessionGap interval. The calculation of this window is triggered by the arrival of the next record after the window ends.
If keyColumn is specified to group data by the column values, the calculations described above will be performed within each group.
Usage Examples
Example 1: Create a session window engine that creates new windows whenever the time gap between consecutive data points exceeds 5ms.
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)
n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)
n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)
n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)
select * from output1;
Output:
time | sym | sumVolume |
---|---|---|
2018.10.12T10:01:00.001 | A | 5 |
2018.10.12T10:01:00.002 | B | 7 |
2018.10.12T10:01:00.003 | C | 3 |
2018.10.12T10:01:00.011 | A | 5 |
2018.10.12T10:01:00.012 | B | 7 |
2018.10.12T10:01:00.013 | C | 3 |
2018.10.12T10:01:00.021 | A | 1 |
2018.10.12T10:01:00.022 | B | 2 |
2018.10.12T10:01:00.023 | C | 3 |
Drop the engine with the following script:
dropStreamEngine(`engine_sw)
unsubscribeTable(, tableName="trades", actionName="append_engine_sw")
Example 2: Specify forceTriggerTime as 1000. 1000 ms after the ingestion of the last record, calculation is triggered in all groups.
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym, forceTriggerTime=1000)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)
n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)
n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)
n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)
sleep(1100)
select * from output1;
Output:
time | sym | sumVolume |
---|---|---|
2018.10.12T10:01:00.001 | A | 5 |
2018.10.12T10:01:00.011 | A | 5 |
2018.10.12T10:01:00.021 | A | 1 |
2018.10.12T10:01:00.028 | A | 4 |
2018.10.12T10:01:00.002 | B | 7 |
2018.10.12T10:01:00.012 | B | 7 |
2018.10.12T10:01:00.022 | B | 2 |
2018.10.12T10:01:00.034 | B | 5 |
2018.10.12T10:01:00.003 | C | 3 |
2018.10.12T10:01:00.013 | C | 3 |
2018.10.12T10:01:00.023 | C | 3 |
2018.10.12T10:01:00.040 | C | 6 |