createDataViewEngine
Syntax
createDataViewEngine(name, outputTable, keyColumns, timeColumn,
[useSystemTime=true], [throttle],[includeOperationType=false])
Details
The createDataViewEngine function allows you to create a data view
engine that returns a keyed table with keyColumns as the key. The table
maintains the latest record for each key. The data view engine is responsible for
maintaining an up-to-date snapshot of each monitored variable and exporting the data
to a target table, usually a stream table, which can be subscribed by other
clients.
Parameters
name is a string indicating the name of the data view engine. It consists of letters, digits, or underscores (_) and must start with a letter.
outputTable is an in-memory table or a DFS table. If you want to display real-time data, or to graph trends of the data, it must be a stream table.
keyColumns is a STRING scalar or vector that specifies one or more columns in the outputTable as the key columns. For each unique value in the keyColumn, only the latest record is retained.
timeColumn is a STRING scalar which specifies the time column in the output table.
useSystemTime (optional) is a Boolean value indicating whether to use the system time as the time column for the output table.
-
If the input data does not contain a time column, useSystemTime should set to true, i.e., the time column of outputTable is the system time.
-
If the input data contain a time column, useSystemTime should set to false, i.e., the time column of outputTable uses the timestamp of each record.
throttle (optional) is of DURATION type, which specifies the time interval between data writes to the outputTable.
includeOperationType (optional) is a Boolean value indicating whether the output includes a column specifying the type of operation performed on each record. The default value is false.
When set to true, the output table will include an additional leading column of type CHAR that specifies the operation applied to each record:
- 'A': insert
- 'U': update
- 'D': delete
Note: Deleted records are not emitted by default. When includeOperationType is enabled, deleted records are emitted with operation type 'D', which may result in a different number of output records.
Returns
Returns a keyed table with keyColumns as the key.
Examples
This example demonstrates how to use the CEP (Complex Event Processing) engine to maintain the latest state of stock orders in real time, including order insertion and deletion operations.
class Orders{
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def Orders(m,c,p,q){
market = m
code = c
price = p
qty = q
}
}
class DeleteOrder{
code :: STRING
def DeleteOrder(c){
code = c
}
}
// Define the monitor
class MainMonitor:CEPMonitor {
def MainMonitor(){
}
// Automatically called when the engine is deleted: drops the shared stream table
def onunload(){ undef('orderDV', SHARED) }
def checkOrders(newOrder)
def deleteOrder(order)
// Create a data view engine with the primary key set to the code column to maintain the latest order information for each stock
def onload(){
addEventListener(checkOrders,'Orders',,'all')
orderDV = streamTable(array(CHAR, 0) as type, array(STRING, 0) as market, array(STRING, 0) as code, array(DOUBLE, 0) as price, array(INT, 0) as qty, array(TIMESTAMP, 0) as updateTime)
share(orderDV,'orderDV')
createDataViewEngine('orderDV', objByName('orderDV'), `code, `updateTime, true, ,true)
addEventListener(deleteOrder,'DeleteOrder',,'all')
}
// Update the latest order information for each stock
def checkOrders(newOrder){
getDataViewEngine('orderDV').append!(table(newOrder.market as market, newOrder.code as code, newOrder.price as price, newOrder.qty as qty))
}
def deleteOrder(order){
deleteDataViewItems('orderDV',order.code )
}
}
// Create the CEP engine
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
try{dropStreamEngine('cep1')}catch(ex){print(ex)}
engine = createCEPEngine(name='cep1', monitors=<MainMonitor()>, dummyTable=dummy, eventSchema=[Orders,DeleteOrder])
engine.appendEvent(Orders("m1", "c1", 10.0, 100))
engine.appendEvent(Orders("m1", "c2", 10.0, 100))
engine.appendEvent(Orders("m1", "c2", 9.5, 100))
engine.appendEvent(DeleteOrder("c2"))
// Query order data from orderDV
select * from orderDV
| type | market | code | price | qty | updateTime |
|---|---|---|---|---|---|
| A | m1 | c1 | 10 | 100 | 2026.02.01 14:53:12.928 |
| A | m1 | c2 | 10 | 100 | 2026.02.01 14:53:12.928 |
| U | m1 | c2 | 9.5 | 100 | 2026.02.01 14:53:12.928 |
| D | m1 | c2 | 9.5 | 100 | 2026.02.01 14:53:12.928 |
Related functions: createCEPEngine, deleteDataViewItems, dropDataViewEngine, getDataViewEngine
