Defining a CEP Engine
The CEP engine processes real-time events primarily by subscribing to heterogeneous
stream tables (through subscribeTable). Event data can be written to
these tables either by using the replay function or through APIs.
Creating a CEP Engine
Syntax
createCEPEngine(name, monitors, dummyTable, eventSchema,
[deserializeParallelism=1], [timeColumn], [eventQueueDepth=1024],
[outputTable], [dispatchKey], [dispatchBucket], [useSystemTime=true],
[eventTimeField="eventTime"])
Parameters
name is a string scalar indicating the name of the CEP engine. It consists of letters, digits, and underscores(_) and must start with a letter.
monitors is metacode or a tuple of metacode containing one or more
constructors of Monitor class. If multiple constructors are
specified, the monitor objects will be constructed in order. For the
instructions on how to define a monitor, refer to Defining Monitors.
dummyTable is a non-partitioned in-memory table or a stream table. The columns are in the following order:
(1) A time column of TIMESTAMP type (if eventTimeField is specified);
(2) A STRING column indicating the events;
(3) A BLOB column that stores the serialized result of each event;
eventSchema is a scalar or vector of class definition of event types, indicating the events (subscribed from APIs or plugins) to be processed. For the instructions on how to define an event, refer to Defining Events.
deserializeParallelism (optional) is an integer that specifies the number of workers to deserialize the subscribed stream table. The default value is 1.
timeColumn (optional) is a STRING scalar or vector indicating the time column(s) of dummyTable. If specified, it is used as the initialized event time for event streams.
eventQueueDepth (optional) is an integer that specifies the queue depth for event input and output queue. The default value is 1024.
outputTable (optional) can be specified as a single or multiple
serializers, each returned by streamEventSerializer. It is used
with the emitEvent function to output events to the table(s)
specified by the serializer(s). The serialization and output processes of
different serializers are independent and executed concurrently.
dispatchKey (optional) is a string scalar indicating the event fields.
-
If specified,the engine creates sub-engines based on the number of unique values of the event field.
-
If not specified, the engine only creates a single sub-engine with the same name as CEP engine (name).
dispatchBucket (optional) is an integer indicating the number of hash buckets. It is used to group the specified event field (dispatchKey) using hash algorithm. To specify this parameter, dispatchKey must be specified. If specified, the engine creates the sub-engines based on bucket numbers specified by dispatchBucket.
useSystemTime (optional) is a Boolean value indicating whether the calculations are performed based on the system time (in millisecond) when the event is ingested into the engine. The default value is true. If set to false, the calculations are performed based on the timeColumn.
eventTimeField (optional) is a STRING scalar or vector indicating the time
column(s) of events. It only takes effect when useSystemTime = false. It
is a scalar if all events use the same time column name. Otherwise, it is a
vector of the same length as eventSchema, where each element represents
the time column for each event. This parameter is required if event streams are
ingested into the CEP engine using appendEvent.
Examples
Create a CEP engine and output different events (MarketData, Orders, Trades) to their corresponding stream tables.
class MarketData{
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def MarketData(m,c,p,q){
market = m
code = c
price = p
qty = q
}
}
class Orders{
trader :: STRING
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def Orders(t, m,c,p,q){
trader = t
market = m
code = c
price = p
qty = q
}
}
class Trades{
trader :: STRING
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def Trades(t, m,c,p,q){
trader = t
market = m
code = c
price = p
qty = q
}
}
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as MarketDataChannel
serializer1 = streamEventSerializer(name=`MarketDataChannel, eventSchema=[MarketData], outputTable=MarketDataChannel)
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as OrdersChannel
serializer2 = streamEventSerializer(name=`OrdersChannel, eventSchema=[Orders], outputTable=OrdersChannel)
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as TradesChannel
serializer3 = streamEventSerializer(name=`TradesChannel, eventSchema=[Trades], outputTable=TradesChannel)
class SimpleShareSearch:CEPMonitor {
// Constructor
def SimpleShareSearch(){
}
def processMarketData(event){
emitEvent(event,,"MarketDataChannel")
}
def processOrders(event){
emitEvent(event,,"OrdersChannel")
}
def processTrades(event){
emitEvent(event,,"TradesChannel")
}
// After creating the CEP sub-engine, the system automatically instantiates the SimpleShareSearch class as a Monitor instance and invokes the onload function.
def onload() {
// Listen to StockTick events
addEventListener(handler=processMarketData, eventType="MarketData", times="all")
addEventListener(handler=processOrders, eventType="Orders", times="all")
addEventListener(handler=processTrades, eventType="Trades", times="all")
}
}
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engine = createCEPEngine(name='cep1', monitors=<SimpleShareSearch()>, dummyTable=dummy, eventSchema=[MarketData,Orders,Trades], outputTable=[serializer1,serializer2,serializer3])
m= MarketData("m", "c", 10.0, 100)
appendEvent(engine, m)
o = Orders("a","m", "c", 10.0, 100)
t = Trades("a","m", "c", 10.0, 100)
appendEvent(engine, o)
appendEvent(engine, t)
- Replay from
memory.
// data to be replayed ordersData = select securityId, orderTime, orderPrice, qty from loadText("/home/Data/orders.csv") tradesData = select securityId, tradeTime, tradePrice, volume from loadText("/home/Data/trades.csv") input_dict = dict([Orders", "Trades"], [ordersData, tradesData]) time_dict = dict(["Orders", "Trades"], [`orderTime, `tradeTime]) // replay data replay(inputTables=input_dict, outputTables=replayOutput, timeColumn=time_dict) - Replay from database (Sample orders and trades).
if(existsDatabase('dfs://eventDB')){ dropDatabase('dfs://eventDB') } create database "dfs://eventDB" partitioned by VALUE(2023.02.01..2023.03.10), HASH([SYMBOL, 20]) engine='TSDB' // import orders orderSchema = extractTextSchema("/home/Data/orders.csv") update orderSchema set type=`SYMBOL where name= `securityId orders=loadTextEx(dbHandle=database('dfs://eventDB'), tableName='orders', partitionColumns=`orderTime`securityId,sortColumns=`securityId`orderTime, filename="/home/Data/orders.csv",schema=orderSchema) // import trades tradeSchema = extractTextSchema("/home/Data/trades.csv") update tradeSchema set type=`SYMBOL where name= `securityId trades=loadTextEx(dbHandle=database('dfs://eventDB'), tableName='trades', partitionColumns=`tradeTime`securityId,sortColumns=`securityId`tradeTime, filename="/home/Data/trades.csv",schema=tradeSchema) ordersData = replayDS(sqlObj=<select securityId, orderTime, orderPrice, qty from loadTable('dfs://eventDB', `orders)>, dateColumn=`orderTime, timeColumn=`orderTime) tradesData = replayDS(sqlObj=<select securityId, tradeTime, tradePrice, volume from loadTable('dfs://eventDB', `trades)>, dateColumn=`tradeTime, timeColumn=`tradeTime) input_dict = dict(["Orders", "Trades"], [ordersData, tradesData]) date_dict = dict(["Orders", "Trades"], [`orderTime, `tradeTime]) time_dict = dict(["Orders", "Trades"], [`orderTime, `tradeTime]) // replay data replay(inputTables=input_dict, outputTables=replayOutput, dateColumn=date_dict, timeColumn=time_dict)
Stopping a Sub-Engine
To stop event processing operations within a specific sub-engine, you can call
stopSubEngine() in any of its monitor instances.
Before the sub-engine is stopped, the following actions occur:
-
If there are spawned monitor instances, the engine will invoke the
onDestroy()function defined in those spawned monitor instances first. -
The engine executes all
onunload()functions (if defined) declared in the monitor instance.
Syntax
stopSubEngine()
Parameters
None
