createCEPEngine

Syntax

createCEPEngine(name, monitors, dummyTable, eventSchema, [deserializeParallelism=1], [timeColumn], [eventQueueDepth=1024], [outputTable], [dispatchKey], [dispatchBucket], [useSystemTime=true], [eventTimeField="eventTime"])

Details

The CEP engine processes real-time events primarily by subscribing to heterogeneous stream tables (through subscribeTable). Event data can be written to these tables either by using the replay function or through APIs.

Note: To write events directly to the CEP engine, use the appendEvent function. This function enables direct appending of events to the event input queue.

Parameters

name is a string scalar indicating the name of the CEP engine. It consists of letters, digits, and underscores(_) and must start with a letter.

monitors is metacode or a tuple of metacode containing one or more constructors of Monitor class. If multiple constructors are specified, the monitor objects will be constructed in order. For the instructions on how to define a monitor, refer to Defining Monitors.

dummyTable is a non-partitioned in-memory table or a stream table. The columns are in the following order:

(1) A time column of TIMESTAMP type (if eventTimeField is specified);

(2) A STRING column indicating the events;

(3) A BLOB column that stores the serialized result of each event;

eventSchema is a scalar or vector of class definition of event types, indicating the events (subscribed from APIs or plugins) to be processed. For the instructions on how to define an event, refer to Defining Events.

deserializeParallelism (optional) is an integer that specifies the number of workers to deserialize the subscribed stream table. The default value is 1.

timeColumn (optional) is a STRING scalar or vector indicating the time column(s) of dummyTable. If specified, it is used as the initialized event time for event streams.

eventQueueDepth (optional) is an integer that specifies the queue depth for event input and output queue. The default value is 1024.

outputTable (optional) can be specified as a single or multiple serializers, each returned by streamEventSerializer. It is used with the emitEvent function to output events to the table(s) specified by the serializer(s). The serialization and output processes of different serializers are independent and executed concurrently.

dispatchKey (optional) is a string scalar indicating the event fields.

  • If specified,the engine creates sub-engines based on the number of unique values of the event field.

  • If not specified, the engine only creates a single sub-engine with the same name as CEP engine (name).

dispatchBucket (optional) is an integer indicating the number of hash buckets. It is used to group the specified event field (dispatchKey) using hash algorithm. To specify this parameter, dispatchKey must be specified. If specified, the engine creates the sub-engines based on bucket numbers specified by dispatchBucket.

useSystemTime (optional) is a Boolean value indicating whether the calculations are performed based on the system time (in millisecond) when the event is ingested into the engine. The default value is true. If set to false, the calculations are performed based on the timeColumn.

eventTimeField (optional) is a STRING scalar or vector indicating the time column(s) of events. It only takes effect when useSystemTime = false. It is a scalar if all events use the same time column name. Otherwise, it is a vector of the same length as eventSchema, where each element represents the time column for each event. This parameter is required if event streams are ingested into the CEP engine using appendEvent.

Returns

It returns a table.

Examples

Example 1. This example creates three types of events (market data, orders, and trades), and defines a dedicated serializer for each event type. Events are then ingested into the engine via appendEvent, where the engine selects the corresponding serializer based on the event type and outputs the data to different stream tables. In addition, by setting dispatchKey='code' and dispatchBucket=4, the example enables parallel processing grouped by stock.

class MarketData{
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def MarketData(m,c,p,q){
        market = m
        code = c
        price = p
        qty = q
    }
}

class Orders{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def Orders(t, m,c,p,q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
    }
}

class Trades{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def Trades(t, m,c,p,q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
    }
}

share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as MarketDataChannel
serializer1 = streamEventSerializer(name=`MarketDataChannel, eventSchema=[MarketData], outputTable=MarketDataChannel)

share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as OrdersChannel
serializer2 = streamEventSerializer(name=`OrdersChannel, eventSchema=[Orders], outputTable=OrdersChannel)

share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as TradesChannel
serializer3 = streamEventSerializer(name=`TradesChannel, eventSchema=[Trades], outputTable=TradesChannel)

class SimpleShareSearch:CEPMonitor {
	// Constructor
	def SimpleShareSearch(){
	}
	def processMarketData(event){
        emitEvent(event,,"MarketDataChannel")
    }
    def processOrders(event){
        emitEvent(event,,"OrdersChannel")
    }
    def processTrades(event){
        emitEvent(event,,"TradesChannel")
    }
	// After creating the CEP sub-engine, the system automatically instantiates the SimpleShareSearch class as a Monitor instance and invokes the onload function.
	def onload() {
		// Listen to StockTick events
		addEventListener(handler=processMarketData, eventType="MarketData", times="all")
		addEventListener(handler=processOrders, eventType="Orders", times="all")
		addEventListener(handler=processTrades, eventType="Trades", times="all")
	}
}

dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engine = createCEPEngine(name='cep1', monitors=<SimpleShareSearch()>, dummyTable=dummy,   
                        eventSchema=[MarketData,Orders,Trades],   
                        outputTable=[serializer1,serializer2,serializer3],  
                        dispatchKey=`code, dispatchBucket=4)

m= MarketData("m", "c", 10.0, 100)
appendEvent(engine, m)

o = Orders("a","m", "c", 10.0, 100)
t = Trades("a","m", "c", 10.0, 100)

appendEvent(engine, o)
appendEvent(engine, t)

In Example 1, since useSystemTime is not specified, the engine uses system time by default. In the internal event listener (addEventListener), time-related parameters (such as within and exceedTime) are evaluated based on the system time when events are ingested.

To use event time instead, configure the parameters as shown in Example 2.

Example 2. This example demonstrates how to use the timestamp from the data (event time) for calculation. Key CEP engine settings are:

  • useSystemTime=false: use event time instead of system time
  • timeColumn: specifies the time column in the dummy table
  • eventTimeField: specifies the time field in the event
Example 2.
// First, clean up the environment
def cleanupCEPEngine(){  
    try { dropStreamEngine("cep1") } catch(ex) {}  
    try { dropStreamEngine("MarketDataChannel") } catch(ex) {}  
    try { dropStreamEngine("OrdersChannel") } catch(ex) {}  
    try { dropStreamEngine("TradesChannel") } catch(ex) {}  
      
    try { dropStreamTable("MarketDataChannel") } catch(ex) {}  
    try { dropStreamTable("OrdersChannel") } catch(ex) {}  
    try { dropStreamTable("TradesChannel") } catch(ex) {}  
      
    try { undef("MarketDataChannel", SHARED) } catch(ex) {}  
    try { undef("OrdersChannel", SHARED) } catch(ex) {}  
    try { undef("TradesChannel", SHARED) } catch(ex) {}  
      
    print("CEP engine environment cleanup completed")  
}  
cleanupCEPEngine()  

class MarketData{  
    market :: STRING  
    code :: STRING  
    price :: DOUBLE  
    qty :: INT  
    eventTime :: TIMESTAMP  
    def MarketData(m,c,p,q,et){  
        market = m  
        code = c  
        price = p  
        qty = q  
        eventTime = et  
    }  
}  
  
class Orders{  
    trader :: STRING  
    market :: STRING  
    code :: STRING  
    price :: DOUBLE  
    qty :: INT  
    eventTime :: TIMESTAMP  
    def Orders(t, m,c,p,q,et){  
        trader = t  
        market = m  
        code = c  
        price = p  
        qty = q  
        eventTime = et  
    }  
}  
  
class Trades{  
    trader :: STRING  
    market :: STRING  
    code :: STRING  
    price :: DOUBLE  
    qty :: INT  
    eventTime :: TIMESTAMP  
    def Trades(t, m,c,p,q,et){  
        trader = t  
        market = m  
        code = c  
        price = p  
        qty = q  
        eventTime = et  
    }  
}  
  
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as MarketDataChannel  
serializer1 = streamEventSerializer(name=`MarketDataChannel, eventSchema=[MarketData], outputTable=MarketDataChannel)  
  
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as OrdersChannel  
serializer2 = streamEventSerializer(name=`OrdersChannel, eventSchema=[Orders], outputTable=OrdersChannel)  
  
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as TradesChannel  
serializer3 = streamEventSerializer(name=`TradesChannel, eventSchema=[Trades], outputTable=TradesChannel)  
  
class SimpleShareSearch:CEPMonitor {  
    def SimpleShareSearch(){  
    }  
    def processMarketData(event){  
        emitEvent(event,,"MarketDataChannel")  
    }  
    def processOrders(event){  
        emitEvent(event,,"OrdersChannel")  
    }  
    def processTrades(event){  
        emitEvent(event,,"TradesChannel")  
    }  
    def onload() {  
        addEventListener(handler=processMarketData, eventType="MarketData", times="all")  
        addEventListener(handler=processOrders, eventType="Orders", times="all")  
        addEventListener(handler=processTrades, eventType="Trades", times="all")  
    }  
}  
  
// Create CEP engine and modify dummy table schema by adding eventTime column  
dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs)  
  
// Create CEP engine and specify timeColumn and eventTimeField  
engine = createCEPEngine(name='cep1', monitors=<SimpleShareSearch()>, dummyTable=dummy,   
                        eventSchema=[MarketData,Orders,Trades],   
                        outputTable=[serializer1,serializer2,serializer3],  
                        dispatchKey=`market, dispatchBucket=4,  
                        useSystemTime=false, timeColumn=`eventTime,  
                        eventTimeField=`eventTime)  
  
// Create and ingest events
// Specify eventTime when creating events  
currentTime = 2026.03.01T09:30:00.000  
m1 = MarketData("NASDAQ", "AAPL", 10.0, 100, currentTime)  
m2 = MarketData("NYSE", "IBM", 150.0, 200, currentTime)  
  
currentTime = 2026.03.01T09:30:01.000  
o1 = Orders("trader1","NASDAQ", "AAPL", 10.0, 100, currentTime)  
o2 = Orders("trader2","NYSE", "IBM", 150.0, 200, currentTime)  
  
currentTime = 2026.03.01T09:30:02.000  
t1 = Trades("trader1","NASDAQ", "AAPL", 10.0, 100, currentTime)  
t2 = Trades("trader2","NYSE", "IBM", 150.0, 200, currentTime)  
  
// Ingest events  
appendEvent(engine, m1)  
appendEvent(engine, m2)  
appendEvent(engine, o1)  
appendEvent(engine, o2)  
appendEvent(engine, t1)  
appendEvent(engine, t2)

Related functions: addEventListener, dropStreamEngine, getCEPEngineStat, stopSubEngine