Cross-Sectional Engine
The cross-sectional engine (created with createCrossSectionalEngine) groups incoming data and takes the latest record from each group for computation. It maintains a keyed table that caches the latest records per group, and a computing engine performing calculations on the cross-sectional data.
This engine is suitable for comparing and calculating the latest data across groups. For example:
- Finance: Calculating the intrinsic value of an index based on the latest prices of all constituent stocks.
- IIoT: Finding the maximum or minimum temperature among the latest readings from devices.
Calculation Rules
The cross-sectional engine can be created using the
createCrossSectionalEngine
function. The syntax is as
follows:
createCrossSectionalEngine(name,
[metrics], dummyTable, [outputTable], keyColumn, [triggeringPattern='perBatch'],
[triggeringInterval=1000], [useSystemTime=true], [timeColumn],
[lastBatchOnly=false], [contextByColumn], [snapshotDir],
[snapshotIntervalInMsgCount], [raftGroup], [outputElapsedMicroseconds=false],
[roundTime=true], [keyFilter], [updatedContextGroupsOnly=false])
This function creates a cross-sectional streaming engine and returns a keyed table with keyColumn as the key.
The keyed table is updated every time a new record arrives. If the parameter lastBatchOnly is set to true, the table only maintains the latest record in each group. When new data is ingested into the engine,
-
if metrics and outputTable are specified, the engine first updates the keyed table, then performs calculations on the latest data and outputs the results to outputTable.
-
if metrics and outputTable are not specified, the engine only updates the keyed table.
Usage Examples
The cross-sectional engine implements flexible calculation triggers. The following examples demonstrate different triggering modes of cross-sectional engines. Before running any example, ensure the streaming environment is reset by canceling all subscriptions and deleting existing stream tables and streaming engines.
unsubscribeTable(tableName="tableName", actionName="actionName")
undef(`tableName, SHARED)
dropStreamEngine("engineName")
Example 1. Per-Row Trigger
When the triggeringPattern parameter is set to 'perRow', the calculation is triggered each time a new record arrives. Assume stock trade data is written in real-time into the trades table with columns sym, time, price and qty. The following example calculates the following metrics in real-time:
- The maximum latest trading quantity across all stocks.
- The maximum latest dollar volume aross all stocks.
- The total latest dollar volume across all stocks.
Define stream tables:
share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
outputTable = table(10:0, `time`maxQty`maxDollarVolume`sumDollarVolume, [TIMESTAMP,INT,DOUBLE,DOUBLE])
Create the cross-sectional engine. The table object tradesCrossAggregator stores the cross-sectional data.
tradesCrossAggregator=createCrossSectionalEngine(name="CrossSectionalDemo", metrics=<[max(qty), max(price*qty), sum(price*qty)]>, dummyTable=trades, outputTable=outputTable, keyColumn=`sym, triggeringPattern=`perRow, useSystemTime=false, timeColumn=`time)
Submit subscription and write simulated data:
subscribeTable(tableName="trades", actionName="tradesCrossAggregator", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true)
def writeData(n){
timev = 2000.10.08T01:01:01.001 + timestamp(1..n)
symv = take(`A`B, n)
pricev = take(102.1 33.4 73.6 223,n)
qtyv = take(60 74 82 59, n)
insert into trades values(timev, symv, pricev, qtyv)
}
writeData(4)
Check the publishing table trades:
select * from trades
time | sym | price | qty |
---|---|---|---|
2000.10.08T01:01:01.002 | A | 102.1 | 60 |
2000.10.08T01:01:01.003 | B | 33.4 | 74 |
2000.10.08T01:01:01.004 | A | 73.6 | 82 |
2000.10.08T01:01:01.005 | B | 223 | 59 |
The table tradesCrossAggregator maintained by the engine serves as an intermediate table, storing the latest data for each group. Check the cross-sectional table tradesCrossAggregator:
select * from tradesCrossAggregator
time | sym | price | qty |
---|---|---|---|
2000.10.08T01:01:01.004 | A | 73.6 | 82 |
2000.10.08T01:01:01.005 | B | 223 | 59 |
Since the cross-sectional engine uses row-by-row triggering mode (triggeringPattern="perRow"), each incoming record triggers a calculation and produces an output. Check the result table outputTable:
select * from outputTable
time | maxQty | maxDollarVolume | sumDollarVolume |
---|---|---|---|
2019.04.08T04:26:01.634 | 60 | 6126 | 6126 |
2019.04.08T04:26:01.634 | 74 | 6126 | 8597.6 |
2019.04.08T04:26:01.634 | 82 | 6035.2 | 8506.8 |
2019.04.08T04:26:01.634 | 82 | 13157 | 19192.2 |
Example 2. Per-Batch Trigger
When the triggeringPattern parameter is set to ‘perBatch’ (default), the calculation is triggered each time a batch of data arrives. The following script generates 6 records, written in 2 batches, and is expected to trigger 2 calculations with 2 output results:
share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
outputTable = table(1:0, `time`maxQty`maxDollarVolume`sumDollarVolume, [TIMESTAMP,INT,DOUBLE,DOUBLE])
tradesCrossAggregator=createCrossSectionalEngine("CrossSectionalDemo", <[max(qty), max(price*qty), sum(price*qty)]>, trades, outputTable, `sym, `perBatch, useSystemTime=false, timeColumn=`time)
subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)
def writeData1(){
timev = 2000.10.08T01:01:01.001 + timestamp(1..4)
symv = take(`A`B, 4)
pricev = 102.1 33.4 102.3 33.2
qtyv = 10 20 40 30
insert into trades values(timev, symv, pricev,qtyv)
}
def writeData2(){
timev = 2000.10.08T01:01:01.005 + timestamp(1..2)
symv = `A`B
pricev = 102.4 33.1
qtyv = 120 60
insert into trades values(timev, symv, pricev,qtyv)
}
// Write 2 batches of data
writeData1();
sleep(100)
writeData2();
dropStreamEngine(`CrossSectionalDemo)
unsubscribeTable(, `trades, `tradesCrossAggregator)
Check publishing table trades:
select * from trades
time | sym | price | qty |
---|---|---|---|
2000.10.08T01:01:01.002 | A | 102.1 | 10 |
2000.10.08T01:01:01.003 | B | 33.4 | 20 |
2000.10.08T01:01:01.004 | A | 102.3 | 40 |
2000.10.08T01:01:01.005 | B | 33.2 | 30 |
2000.10.08T01:01:01.006 | A | 102.4 | 120 |
2000.10.08T01:01:01.007 | B | 33.1 | 60 |
Check the cross-sectional table tradesCrossAggregator:
select * from tradesCrossAggregator
time | sym | price | qty |
---|---|---|---|
2000.10.08T01:01:01.006 | A | 102.4 | 120 |
2000.10.08T01:01:01.007 | B | 33.1 | 60 |
Since the data is written in 2 batches, the cross-sectional engine outputs 2 records in perBatch mode:
select * from outputTable
time | maxQty | maxDollarVolume | sumDollarVolume |
---|---|---|---|
2019.04.08T04:52:50.255 | 40 | 4092 | 5088 |
2019.04.08T04:52:50.355 | 120 | 12288 | 14274 |
Example 3. System-Time Based Trigger
When the triggeringPattern parameter is set to "interval", it must be used with the triggeringInterval parameter, indicating that calculations are triggered based on system time every triggeringInterval milliseconds. In this example, data is written in 6 batches, with calculations triggered every 500 milliseconds. Each batch writes 1 record, with intervals of 500 or 1000 milliseconds.
share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
outputTable = table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT])
tradesCrossAggregator=createCrossSectionalEngine(name="tradesCrossAggregator", metrics=<[avg(price), sum(qty), sum(price*qty), count(price)]>, dummyTable=trades, outputTable=outputTable, keyColumn=`sym, triggeringPattern="interval", triggeringInterval=500)
subscribeTable(tableName="trades", actionName="tradesStats", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true)
insert into trades values(2020.08.12T09:30:00.000, `A, 10, 20)
sleep(500)
insert into trades values(2020.08.12T09:30:00.000 + 500, `B, 20, 10)
sleep(500)
insert into trades values(2020.08.12T09:30:00.000 + 1000, `A, 10.1, 20)
sleep(1000)
insert into trades values(2020.08.12T09:30:00.000 + 2000, `B, 20.1, 30)
sleep(500)
insert into trades values(2020.08.12T09:30:00.000 + 2500, `B, 20.2, 40)
sleep(500)
insert into trades values(2020.08.12T09:30:00.000 + 3000, `A, 10.2, 20)
sleep(500)
select * from outputTable;
time | avgPrice | volume | dollarVolume | count |
---|---|---|---|---|
2021.07.27T10:54:00.303 | 10 | 20 | 200 | 1 |
2021.07.27T10:54:00.818 | 15 | 30 | 400 | 2 |
2021.07.27T10:54:01.331 | 15.05 | 30 | 402 | 2 |
2021.07.27T10:54:02.358 | 15.1 | 50 | 805 | 2 |
2021.07.27T10:54:02.871 | 15.15 | 60 | 1010 | 2 |
2021.07.27T10:54:03.386 | 15.2 | 60 | 1012 | 2 |
Example 4. Composite Triggers
When setting triggeringPattern='keyCount' and lastBatchOnly=true, only the data with the latest timestamp is included in the calculations. The following script sets triggeringInterval to 4, meaning calculations are triggered only when 4 records with the same timestamp are received or when data with an updated timestamp is received.
// Define stream tables
share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as tick
share table(1:0, `time`amount, [TIMESTAMP,INT]) as opt
// Create cross-sectional engine
csEngine=createCrossSectionalEngine(name="csEngineDemo", metrics=<[sum(qty)]>, dummyTable=tick, outputTable=opt, keyColumn=`sym, triggeringPattern="keyCount", triggeringInterval=4, timeColumn=`time, useSystemTime=false,lastBatchOnly=true)
subscribeTable(tableName=`tick, actionName="csEngineDemo", msgAsTable=true, handler=append!{csEngine})
def writeData1(){
time = array(timestamp)
time=take(2020.10.08T10:01:01.000,7)
sym=take("A"+string(1..7),7)
price=1..7
qty=1..7
insert into tick values(time, sym, price, qty)
}
// Write data (first time)
writeData1();
def writeData2(){
time = array(timestamp)
time=take(2020.10.08T10:30:01.000,5)
sym=take("A"+string(1..5),5)
price=1..5
qty=1..5
insert into tick values(time, sym, price, qty)
}
// Write data (second time)
writeData2();
select * from opt
time | amount |
---|---|
2020.10.08 10:01:01.000 | 28 |
2020.10.08 10:30:01.000 | 15 |
As can be observed from the result table, the first result is triggered by the first record received at 2020.10.09T10:01:01.001. The latest record of each group in the cross-sectional table at 2020.10.08T10:01:01.000 is summed up, producing an output 28. The second result is triggered when 4 records are received at 2020.10.08T10:30:01.000.
Since lastBatchOnly = true is set, the cross-sectional table will retain only the latest timestamp's data for each group.
time | sym | price | qty |
---|---|---|---|
2020.10.08 10:30:01.000 | A1 | 1 | 1 |
2020.10.08 10:30:01.000 | A2 | 2 | 2 |
2020.10.08 10:30:01.000 | A3 | 3 | 3 |
2020.10.08 10:30:01.000 | A4 | 4 | 4 |
2020.10.08 10:30:01.000 | A5 | 5 | 5 |
In the examples above, the cross-sectional table returned by
createCrossSectionalEngine
serves as an intermediate table
for caching cross-sectional data. This table can also be used as a final result
table. For example, if we want to periodically refresh the latest price of a
stock, the conventional approach is to filter the stock by code from the
real-time trade data and retrieve the latest record. However, as the dataset
grows over time, frequent queries can lead to increased system resource
consumption and reduced query efficiency. In contrast, the cross-sectional table
always retains only the latest trade data for all stocks, with a relatively
stable data size, making it ideal for such periodic polling scenarios.
To use the cross-sectional table as the final result, set the metrics and outputTable parameters to empty when creating the cross-sectional engine.
tradesCrossAggregator=createCrossSectionalEngine("CrossSectionalDemo", , trades, , `sym, `perRow)