createDataViewEngine
Syntax
createDataViewEngine(name, outputTable, keyColumns, timeColumn,
[useSystemTime=true], [throttle],[includeOperationType=false])
Details
The createDataViewEngine function allows you to create a data view
engine that returns a keyed table with keyColumns as the key. The table
maintains the latest record for each key. The data view engine is responsible for
maintaining an up-to-date snapshot of each monitored variable and exporting the data
to a target table, usually a stream table, which can be subscribed by other
clients.
Parameters
name is a string indicating the name of the data view engine. It consists of letters, digits, or underscores (_) and must start with a letter.
outputTable is an in-memory table or a DFS table. If you want to display real-time data, or to graph trends of the data, it must be a stream table.
keyColumns is a STRING scalar or vector that specifies one or more columns in the outputTable as the key columns. For each unique value in the keyColumn, only the latest record is retained.
timeColumn is a STRING scalar which specifies the time column in the output table.
useSystemTime (optional) is a Boolean value indicating whether to use the system time as the time column for the output table.
-
If the input data does not contain a time column, useSystemTime should set to true, i.e., the time column of outputTable is the system time.
-
If the input data contain a time column, useSystemTime should set to false, i.e., the time column of outputTable uses the timestamp of each record.
throttle (optional) is of DURATION type, which specifies the time interval between data writes to the outputTable.
includeOperationType (optional) is a Boolean value indicating whether the output includes a column specifying the type of operation performed on each record. The default value is false.
When set to true, the output table will include an additional leading column of type CHAR that specifies the operation applied to each record:
- 'A': insert
- 'U': update
- 'D': delete
Note: Deleted records are not emitted by default. When includeOperationType is enabled, deleted records are emitted with operation type 'D', which may result in a different number of output records.
Returns
Returns a keyed table with keyColumns as the key.
Examples
Example 1. This example demonstrates how to use the CEP (Complex Event Processing) engine to maintain the latest state of stock orders in real time, including order insertion and deletion operations.
class Orders{
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def Orders(m,c,p,q){
market = m
code = c
price = p
qty = q
}
}
class DeleteOrder{
code :: STRING
def DeleteOrder(c){
code = c
}
}
// Define the monitor
class MainMonitor:CEPMonitor {
def MainMonitor(){
}
// Automatically called when the engine is deleted: drops the shared stream table
def onunload(){ undef('orderDV', SHARED) }
def checkOrders(newOrder)
def deleteOrder(order)
// Create a data view engine with the primary key set to the code column to maintain the latest order information for each stock
def onload(){
addEventListener(checkOrders,'Orders',,'all')
orderDV = streamTable(array(CHAR, 0) as type, array(STRING, 0) as market, array(STRING, 0) as code, array(DOUBLE, 0) as price, array(INT, 0) as qty, array(TIMESTAMP, 0) as updateTime)
share(orderDV,'orderDV')
createDataViewEngine('orderDV', objByName('orderDV'), `code, `updateTime, true, ,true)
addEventListener(deleteOrder,'DeleteOrder',,'all')
}
// Update the latest order information for each stock
def checkOrders(newOrder){
getDataViewEngine('orderDV').append!(table(newOrder.market as market, newOrder.code as code, newOrder.price as price, newOrder.qty as qty))
}
def deleteOrder(order){
deleteDataViewItems('orderDV',order.code )
}
}
// Create the CEP engine
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
try{dropStreamEngine('cep1')}catch(ex){print(ex)}
engine = createCEPEngine(name='cep1', monitors=<MainMonitor()>, dummyTable=dummy, eventSchema=[Orders,DeleteOrder])
engine.appendEvent(Orders("m1", "c1", 10.0, 100))
engine.appendEvent(Orders("m1", "c2", 10.0, 100))
engine.appendEvent(Orders("m1", "c2", 9.5, 100))
engine.appendEvent(DeleteOrder("c2"))
// Query order data from orderDV
select * from orderDV
| type | market | code | price | qty | updateTime |
|---|---|---|---|---|---|
| A | m1 | c1 | 10 | 100 | 2026.02.01 14:53:12.928 |
| A | m1 | c2 | 10 | 100 | 2026.02.01 14:53:12.928 |
| U | m1 | c2 | 9.5 | 100 | 2026.02.01 14:53:12.928 |
| D | m1 | c2 | 9.5 | 100 | 2026.02.01 14:53:12.928 |
includeOperationType=false and modify the schema of the orderDV
table accordingly. Replace the onload script with the following
code:def onload(){
addEventListener(checkOrders,'Orders',,'all')
orderDV = streamTable(array(STRING, 0) as market, array(STRING, 0) as code, array(DOUBLE, 0) as price, array(INT, 0) as qty, array(TIMESTAMP, 0) as updateTime)
share(orderDV,'orderDV')
createDataViewEngine(name='orderDV', outputTable=objByName('orderDV'), keyColumns=`code, timeColumn=`updateTime, useSystemTime=true, includeOperationType=false)
addEventListener(deleteOrder,'DeleteOrder',,'all')
}The orderDV table is shown below. It can be seen that when
includeOperationType=false, the engine does not output records
for deleted data, resulting in a different number of rows compared to when
includeOperationType is true.
| market | code | price | qty | updateTime |
|---|---|---|---|---|
| m1 | c1 | 10 | 100 | 2026.04.17 17:35:42.313 |
| m1 | c2 | 10 | 100 | 2026.04.17 17:35:42.313 |
| m1 | c2 | 9.5 | 100 | 2026.04.17 17:35:42.313 |
Example 2. This example demonstrates the effect of the throttle parameter.
When creating the Data View engine, if the throttle parameter is set to 30
seconds, the engine will only write data to the output table at 30-second intervals.
Modify the onload function as follows:
def onload(){
addEventListener(checkOrders,'Orders',,'all')
orderDV = streamTable(array(STRING, 0) as market, array(STRING, 0) as code, array(DOUBLE, 0) as price, array(INT, 0) as qty, array(TIMESTAMP, 0) as updateTime)
share(orderDV,'orderDV')
createDataViewEngine(name='orderDV', outputTable=objByName('orderDV'), keyColumns=`code, timeColumn=`updateTime, useSystemTime=true, throttle=30s, includeOperationType=false)
addEventListener(deleteOrder,'DeleteOrder',,'all')
}
After ingesting events into the engine and waiting 30 seconds, check the orderDV
table. You will see that only one record, c1, remains in the table. This is because
before the engine outputs the data, a DeleteOrder event has already
entered the engine and removed the record c2.
Related functions: createCEPEngine, deleteDataViewItems, dropDataViewEngine, getDataViewEngine
