createSessionWindowEngine

Syntax

createSessionWindowEngine(name, sessionGap, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [updateTime], [useSessionStartTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup], [forceTriggerTime])

Details

This function creates a session window streaming engine. The session window engine shares most of its parameters with the time-series engine (createTimeSeriesEngine), but includes two unique parameters: sessionGap and useSessionStartTime.

For more application scenarios, see Streaming Engines.

Starting from version 2.00.11, array vectors are allowed in dummyTable and outputTable, but they cannot be involved in calculations specified in metrics.

Calculation Rules

When records are ingested into a session window, the window remains open until a specified period of inactivity passes (specified by sessionGap). The window end is equal to the timestamp of the last received record + the sessionGap interval. The calculation of this window is triggered by the arrival of the next record after the window ends.

Note:
If the system time when data is input into the engine is not used as the time column for computation, the timestamps indicated by timeColumn in the input stream table must be non-decreasing. If a grouping column (keyColumn) is also specified, the timestamps within each group must be non-decreasing. Otherwise, out-of-order data will be discarded and not included in the computation.

Parameters

As most of the parameters of createSessionWindowEngine are identical with those of createTimeSeriesEngine, we only explain the following parameters of createSessionWindowEngine that are different from those of createTimeSeriesEngine.

sessionGap a positive integer indicating the gap between 2 session windows. Its unit is determined by the parameter useSystemTime.

useSessionStartTime (optional) is a Boolean value indicating whether the first column in outputTable is the starting time of the windows, i.e., the timestamp of the first record in each window. Setting it to false means the timestamps in the output table are the ending time of the windows, i.e., timestamp of the last record in window + sessionGap. If updateTime is specified, useSessionStartTime must be true.

forceTriggerTime (optional) is a non-negative integer. Its unit is the same as the time precision of timeColumn. forceTriggerTime indicates the waiting time to force trigger calculation in uncalculated windows for each group.

Returns

A table object.

Examples

Example 1. The following example shows how to calculate trading volume by stock symbol. First create a table “engine_sw” with createSessionWindowEngine, then subscribe to the stream table “trades” and writes data to the engine. The engine conducts calculations by group based on column sym and saves the results to output1 .

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
share table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) as output1
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)

n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)

n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

select * from output1;
time sym sumVolume
2018.10.12T10:01:00.001 A 5
2018.10.12T10:01:00.002 B 7
2018.10.12T10:01:00.003 C 3
2018.10.12T10:01:00.011 A 5
2018.10.12T10:01:00.012 B 7
2018.10.12T10:01:00.013 C 3
2018.10.12T10:01:00.021 A 1
2018.10.12T10:01:00.022 B 2
2018.10.12T10:01:00.023 C 3

Specify forceTriggerTime as 1000. 1000 ms after the ingestion of the last record, calculation is triggered in all groups. Replace the engine creation statement with the following code:

engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym, forceTriggerTime=1000)

Print the output table again. The result is as follows:

time sym sumVolume
2018.10.12T10:01:00.001 A 5
2018.10.12T10:01:00.002 B 7
2018.10.12T10:01:00.003 C 3
2018.10.12T10:01:00.011 A 5
2018.10.12T10:01:00.012 B 7
2018.10.12T10:01:00.013 C 3
2018.10.12T10:01:00.021 A 1
2018.10.12T10:01:00.022 B 2
2018.10.12T10:01:00.023 C 3
2018.10.12T10:01:00.028 A 4
2018.10.12T10:01:00.034 B 5
2018.10.12T10:01:00.040 C 6

Example 2. The following example demonstrates using the engine to capture the last status by sensor groups, and outputs the end time of each session window (last data time + sessionGap) by setting useSessionStartTime=false.

// Create sensor data table
share streamTable(1000:0, `time`sensor`status, [TIMESTAMP, SYMBOL, INT]) as sensors
share table(10000:0, `time`sensor`lastStatus, [TIMESTAMP, SYMBOL, INT]) as output2

// Create the engine with useSessionStartTime=false
engine_sw = createSessionWindowEngine(name = "sensor_engine", sessionGap = 5,
    metrics = <last(status)>, dummyTable = sensors,
    outputTable = output2, timeColumn = `time, keyColumn=`sensor,
    useSessionStartTime=false)
subscribeTable(tableName="sensors", actionName="sensor_monitor", 
    offset=0, handler=append!{engine_sw}, msgAsTable=true)

// Insert into data
n = 5
timev = 2023.01.01T10:00:00.000 + (1..n)
sensorv = take(`SENSOR001`SENSOR002`SENSOR003, n)
statusv = rand(0..2, n)  // 0=Offline, 1=Normal, 2=Fault
insert into sensors values(timev, sensorv, statusv)

n = 5
timev = 2023.01.01T10:00:00.010 + (1..n)
statusv = rand(0..2, n)
sensorv = take(`SENSOR001`SENSOR002`SENSOR003, n)
insert into sensors values(timev, sensorv, statusv)

n = 6
timev = 2023.01.01T10:00:00.020 + 1 2 3 8 14 20
statusv = rand(0..2, n)
sensorv = take(`SENSOR001`SENSOR002`SENSOR003, n)
insert into sensors values(timev, sensorv, statusv)

select * from output2;
// Output:
print(output2)
time                    sensor    lastStatus
----------------------- --------- ----------
2023.01.01T10:00:00.008 SENSOR003 0         
2023.01.01T10:00:00.009 SENSOR001 2         
2023.01.01T10:00:00.010 SENSOR002 1         
2023.01.01T10:00:00.018 SENSOR003 1         
2023.01.01T10:00:00.019 SENSOR001 2         
2023.01.01T10:00:00.020 SENSOR002 1         
2023.01.01T10:00:00.026 SENSOR001 2         
2023.01.01T10:00:00.027 SENSOR002 0         
2023.01.01T10:00:00.028 SENSOR003 1