createDataViewEngine

Syntax

createDataViewEngine(name, outputTable, keyColumns, timeColumn, [useSystemTime=true], [throttle],[includeOperationType=false])

Details

The createDataViewEngine function allows you to create a data view engine that returns a keyed table with keyColumns as the key. The table maintains the latest record for each key. The data view engine is responsible for maintaining an up-to-date snapshot of each monitored variable and exporting the data to a target table, usually a stream table, which can be subscribed by other clients.

Parameters

name is a string indicating the name of the data view engine. It consists of letters, digits, or underscores (_) and must start with a letter.

outputTable is an in-memory table or a DFS table. If you want to display real-time data, or to graph trends of the data, it must be a stream table.

keyColumns is a STRING scalar or vector that specifies one or more columns in the outputTable as the key columns. For each unique value in the keyColumn, only the latest record is retained.

timeColumn is a STRING scalar which specifies the time column in the output table.

useSystemTime (optional) is a Boolean value indicating whether to use the system time as the time column for the output table.

  • If the input data does not contain a time column, useSystemTime should set to true, i.e., the time column of outputTable is the system time.

  • If the input data contain a time column, useSystemTime should set to false, i.e., the time column of outputTable uses the timestamp of each record.

throttle (optional) is of DURATION type, which specifies the time interval between data writes to the outputTable.

includeOperationType (optional) is a Boolean value indicating whether the output includes a column specifying the type of operation performed on each record. The default value is false.

When set to true, the output table will include an additional leading column of type CHAR that specifies the operation applied to each record:

  • 'A': insert
  • 'U': update
  • 'D': delete

Note: Deleted records are not emitted by default. When includeOperationType is enabled, deleted records are emitted with operation type 'D', which may result in a different number of output records.

Returns

Returns a keyed table with keyColumns as the key.

Examples

This example demonstrates how to use the CEP (Complex Event Processing) engine to maintain the latest state of stock orders in real time, including order insertion and deletion operations.

class Orders{
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def Orders(m,c,p,q){
        market = m
        code = c
        price = p
        qty = q
    }
}
class DeleteOrder{
 
    code :: STRING
  
    def DeleteOrder(c){
        code = c
    }
}
// Define the monitor
class MainMonitor:CEPMonitor {
    def MainMonitor(){
    }
    // Automatically called when the engine is deleted: drops the shared stream table
    def onunload(){ undef('orderDV', SHARED) }
    def checkOrders(newOrder)
    def deleteOrder(order)
    // Create a data view engine with the primary key set to the code column to maintain the latest order information for each stock
    def onload(){
        addEventListener(checkOrders,'Orders',,'all') 
        orderDV = streamTable(array(CHAR, 0) as type, array(STRING, 0) as market, array(STRING, 0) as code, array(DOUBLE, 0) as price, array(INT, 0) as qty, array(TIMESTAMP, 0) as updateTime)
        share(orderDV,'orderDV')
        createDataViewEngine('orderDV', objByName('orderDV'), `code, `updateTime, true, ,true)
        addEventListener(deleteOrder,'DeleteOrder',,'all') 
    }
    // Update the latest order information for each stock
    def checkOrders(newOrder){
        getDataViewEngine('orderDV').append!(table(newOrder.market as market, newOrder.code as code, newOrder.price as price, newOrder.qty as qty))
    }
    def deleteOrder(order){
        deleteDataViewItems('orderDV',order.code )
    }
}
// Create the CEP engine
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
try{dropStreamEngine('cep1')}catch(ex){print(ex)}
engine = createCEPEngine(name='cep1', monitors=<MainMonitor()>, dummyTable=dummy, eventSchema=[Orders,DeleteOrder])
engine.appendEvent(Orders("m1", "c1", 10.0, 100))
engine.appendEvent(Orders("m1", "c2", 10.0, 100))
engine.appendEvent(Orders("m1", "c2", 9.5, 100))
engine.appendEvent(DeleteOrder("c2"))

// Query order data from orderDV
select * from orderDV
type market code price qty updateTime
A m1 c1 10 100 2026.02.01 14:53:12.928
A m1 c2 10 100 2026.02.01 14:53:12.928
U m1 c2 9.5 100 2026.02.01 14:53:12.928
D m1 c2 9.5 100 2026.02.01 14:53:12.928

Related functions: createCEPEngine, deleteDataViewItems, dropDataViewEngine, getDataViewEngine