createSessionWindowEngine

Syntax

createSessionWindowEngine(name, sessionGap, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [updateTime], [useSessionStartTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup], [forceTriggerTime])

Details

This function creates a session window streaming engine. The session window engine shares most of its parameters with the time-series engine (createTimeSeriesEngine), but includes two unique parameters: sessionGap and useSessionStartTime.

For more application scenarios, see Streaming Engines.

Starting from version 2.00.11, array vectors are allowed in dummyTable and outputTable, but they cannot be involved in calculations specified in metrics.

Calculation Rules

When records are ingested into a session window, the window remains open until a specified period of inactivity passes (specified by sessionGap). The window end is equal to the timestamp of the last received record + the sessionGap interval. The calculation of this window is triggered by the arrival of the next record after the window ends.

Note:
If the system time when data is input into the engine is not used as the time column for computation, the timestamps indicated by timeColumn in the input stream table must be non-decreasing. If a grouping column (keyColumn) is also specified, the timestamps within each group must be non-decreasing. Otherwise, out-of-order data will be discarded and not included in the computation.

Parameters

As most of the parameters of createSessionWindowEngine are identical with those of createTimeSeriesEngine, we only explain the following parameters of createSessionWindowEngine that are different from those of createTimeSeriesEngine.

sessionGap a positive integer indicating the gap between 2 session windows. Its unit is determined by the parameter useSystemTime.

useSessionStartTime (optional) is a Boolean value. Defaults to true indicating whether the first column in outputTable is the starting time of the windows, i.e., the timestamp of the first record in each window. Setting it to false means the timestamps in the output table are the ending time of the windows, i.e., timestamp of the last record in window + sessionGap. If updateTime is specified, useSessionStartTime must be true.

forceTriggerTime (optional) is a non-negative integer. Its unit is the same as the time precision of timeColumn. forceTriggerTime indicates the waiting time to force trigger calculation in uncalculated windows for each group.

Returns

A table object.

Examples

Example 1. The following example creates a session window engine using the createSessionWindowEngine with sessionGap=5 ms, performing window partitioning based on continuous trading activity periods rather than fixed time periods (unlike time series engine).

The engine identifies each stock's trading activity period: when a stock has no new trading data within 5ms, the window ends and outputs the total trading volume within that window.

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
share table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) as output1
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)

n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)

n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

select * from output1;
time sym sumVolume
2018.10.12T10:01:00.001 A 5
2018.10.12T10:01:00.002 B 7
2018.10.12T10:01:00.003 C 3
2018.10.12T10:01:00.011 A 5
2018.10.12T10:01:00.012 B 7
2018.10.12T10:01:00.013 C 3
2018.10.12T10:01:00.021 A 1
2018.10.12T10:01:00.022 B 2
2018.10.12T10:01:00.023 C 3

Example 2. Force trigger uncalculated window calculations by specifying forceTriggerTime.

Drop the engine with the following script:

dropStreamEngine(`engine_sw)
unsubscribeTable(, tableName="trades", actionName="append_engine_sw")

Recreate the session engine, and this time specify forceTriggerTime as 1000ms, which means that after the engine receives the last data, it will trigger calculation and output of all grouped data after 1000ms:

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
share table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) as output1
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym, forceTriggerTime=1000)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)

n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)

n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

sleep(1100)
select * from output1;

Querying the output table again shows the following results, which demonstrate that setting forceTriggerTime to false successfully forced the calculation of the last 3 ended but uncalculated windows. While in Example 1, without setting forceTriggerTime and with no subsequent data to trigger calculation, these windows failed to output results:

time sym sumVolume
2018.10.12T10:01:00.001 A 5
2018.10.12T10:01:00.002 B 7
2018.10.12T10:01:00.003 C 3
2018.10.12T10:01:00.011 A 5
2018.10.12T10:01:00.012 B 7
2018.10.12T10:01:00.013 C 3
2018.10.12T10:01:00.021 A 1
2018.10.12T10:01:00.022 B 2
2018.10.12T10:01:00.023 C 3
2018.10.12T10:01:00.028 A 4
2018.10.12T10:01:00.034 B 5
2018.10.12T10:01:00.040 C 6

Example 3. Set useSessionStartTime to false to output window end time.

Drop the engine with the following script:

dropStreamEngine(`engine_sw)
unsubscribeTable(, tableName="trades", actionName="append_engine_sw")

Recreate the engine, and this time set useSessionStartTime to false, which causes the output table's time column to display each session window's end time (i.e., last data time + sessionGap) instead of the default window start time.

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
share table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) as output1
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym, useSessionStartTime=false, forceTriggerTime=1000)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)

n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)

n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

sleep(1100)
select * from output1;
time sym sumVolume
2018.10.12 10:01:00.008 C 3
2018.10.12 10:01:00.009 A 5
2018.10.12 10:01:00.010 B 7
2018.10.12 10:01:00.018 C 3
2018.10.12 10:01:00.019 A 5
2018.10.12 10:01:00.020 B 7
2018.10.12 10:01:00.026 A 1
2018.10.12 10:01:00.027 B 2
2018.10.12 10:01:00.028 C 3
2018.10.12 10:01:00.033 A 4
2018.10.12 10:01:00.039 B 5
2018.10.12 10:01:00.045 C 6