Session Window Engine

Session windows can be considered as activity sessions (where data is generated). Before and after a session, there are gaps of inactivity (where no data are generated).

Taking the IoT scenario as an example: depending on whether a device is online or not, in certain time periods a large amount of data could be generated while in others there could be none. Applying sliding window calculation on such data may cause unnecessary computational overhead as a lot of empty windows will be generated. The session window engine is designed to solve such problems.

Calculation Rules

The session window engine can be created using the createSessionWindowEngine function. The syntax is as follows:

createSessionWindowEngine(name, sessionGap, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [updateTime], [useSessionStartTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup], [forceTriggerTime])

For detailed information, refer to createSessionWindowEngine.

The session window engine has the same calculation rules and triggering patterns as the time-series engine. The difference is that the windows of the time-series engine are generated at fixed frequencies with a fixed size whereas the windows of the session window engine are not. Most of its parameters are the same as those of createTimeSeriesEngine, with the only differences being the sessionGap and useSessionStartTime parameters. sessionGap defines how long to wait before closing a session window, and useSessionStartTime determines window's start or end time in the output table (specified by sessionGap).

When records are ingested into a session window, the window remains open until a specified period of inactivity passes (specified by sessionGap). The window end is equal to the timestamp of the last received record + the sessionGap interval. The calculation of this window is triggered by the arrival of the next record after the window ends.

Note:

If keyColumn is specified to group data by the column values, the calculations described above will be performed within each group.

Usage Examples

Example 1: Create a session window engine that creates new windows whenever the time gap between consecutive data points exceeds 5ms.

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)

n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)

n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

select * from output1;

Output:

time sym sumVolume
2018.10.12T10:01:00.001 A 5
2018.10.12T10:01:00.002 B 7
2018.10.12T10:01:00.003 C 3
2018.10.12T10:01:00.011 A 5
2018.10.12T10:01:00.012 B 7
2018.10.12T10:01:00.013 C 3
2018.10.12T10:01:00.021 A 1
2018.10.12T10:01:00.022 B 2
2018.10.12T10:01:00.023 C 3

Drop the engine with the following script:

dropStreamEngine(`engine_sw)
unsubscribeTable(, tableName="trades", actionName="append_engine_sw")

Example 2: Specify forceTriggerTime as 1000. 1000 ms after the ingestion of the last record, calculation is triggered in all groups.

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym, forceTriggerTime=1000)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)

n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)

n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

sleep(1100)
select * from output1;

Output:

time sym sumVolume
2018.10.12T10:01:00.001 A 5
2018.10.12T10:01:00.011 A 5
2018.10.12T10:01:00.021 A 1
2018.10.12T10:01:00.028 A 4
2018.10.12T10:01:00.002 B 7
2018.10.12T10:01:00.012 B 7
2018.10.12T10:01:00.022 B 2
2018.10.12T10:01:00.034 B 5
2018.10.12T10:01:00.003 C 3
2018.10.12T10:01:00.013 C 3
2018.10.12T10:01:00.023 C 3
2018.10.12T10:01:00.040 C 6