Routing Events
Events can be added outside the CEP engine context or routed within a CEP engine. Within a CEP engine context, the monitor instance serves as the control center for event processing, defining how events are handled and where they should be routed. The source monitor instance has the capability to route an event either to the event input queue for further matching with another monitor instance or directly to the event output queue. Additionally, events can stay at the event matcher stage and continue being matched against newly defined listeners (see Defining Event Listeners).
To Event Input Queue
DolphinDB provides two functions, routeEvent and
sendEvent, to route events to event input queue and achieve
communication among monitor instances.
If you need to append events from outside the CEP engine context, the
appendEvent function can be used, which enables you to inject
events into the CEP engine's event input queue from an external context or
application.
routeEvent
A routed event goes to the front of the input queue. The engine processes all routed events before it processes the next non-routed event on the input queue.
Syntax
routeEvent(event)
Parameters
event is an event instance.
sendEvent
An event sent by sendEvent goes to the end of the input queue.
The engine processes events in the order they are sent to the input queue.
Syntax
sendEvent(event)
Parameters
event is an event instance.
appendEvent
The appendEvent function allows direct event appending to event
input queue, avoiding serialization and deserialization compared to writing data
through heterogeneous tables.
Syntax
appendEvent(engine, events)
Parameters
engine is the engine object returned by createCEPEngine
or streamEventSerializer.
events is a class object of the event instance or a dictionary. If it is a dictionary, the event instances will be automatically constructed with the key-value pairs provided. The keys of the dictionary must include the event type (specified with eventSchema) and all of its event fields.
Examples
Example 1. Append Orders events
// define an event Orders
class Orders{
eventTime :: TIMESTAMP
sym :: STRING
val0 :: INT
val1 :: FLOAT
val2 :: DOUBLE
def Orders(s,v0,v1,v2){
sym = s
val0 = v0
val1 = v1
val2 = v2
eventTime = now()
}
}
// create a CEP engine
engine=createCEPEngine(name="test_CEP",monitors=<Monitor1()>,dummyTable=dummy,eventSchema=[Orders,Change],timeColumn='eventTime')
// if events is a class instance
appendEvent(`test_CEP,Orders("b"+lpad(string(i),3,"0"),i,i*1,i*10))
// if events is a dictionary
d=dict(['eventType',"sym", "val0","val1", "val2"],["Orders",'a000',5,float(3.6),double(5.3)])
appendEvent(`test_CEP,d)
Example 2. Specify event time with eventTimeField for events appended with
appendEvent.
class Test{
key :: string
et :: TIMESTAMP
def Test(c){
key = c
et = now()
}
}
class MarketData{
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
et :: TIMESTAMP
def MarketData(m,c,p,q){
market = m
code = c
price = p
qty = q
et = now()
}
}
class MainMonitor:CEPMonitor{
streamMinuteBar_1min :: ANY // 1-min OHLC bar
tsAggrOHLC :: ANY // time-series engine
def MainMonitor(){
colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
streamMinuteBar_1min = table(10000:0,colNames, colTypes)
}
def updateMarketData(event)
// monitor market data, and calculate 1-min OHLC bars with time-series engine
def onload(){
addEventListener(updateMarketData,'MarketData',,'all')
colNames=["symbol","time","price","type","volume"]
colTypes=[SYMBOL, TIMESTAMP, DOUBLE, STRING, INT]
dummy = table(10000:0,colNames,colTypes)
colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
output = table(10000:0,colNames, colTypes)
tsAggrOHLC = createTimeSeriesEngine(name="tsAggrOHLC", windowSize=60000, step=60000, metrics=<[first(price) as open ,max(price) as max,min(price) as min ,last(price) as close ,sum(volume) as volume ,wsum(volume, price) as amount ,(last(price)-first(price)/first(price)) as ret, (wsum(volume, price)/sum(volume)) as vwap]>, dummyTable=dummy, outputTable=streamMinuteBar_1min, timeColumn='time', useSystemTime=false, keyColumn="symbol", fill=`none)
}
def updateMarketData(event){
tsAggrOHLC.append!(table(event.code as symbol, event.et as time, event.price as price, event.market as type, event.qty as volume))
}
}
dummy = table(array(TIMESTAMP, 0) as eventTime,array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engine = createCEPEngine(name='cep1', monitors=<MainMonitor()>, dummyTable=dummy, eventSchema=[MarketData,Test],dispatchKey='key',eventTimeField='et',useSystemTime=false,timeColumn="eventTime")
data = Test("hk")
// append events to CEP engine
appendEvent(engine, data)
data = MarketData("hk","e", 1.0, 1)
// append events to CEP engine
appendEvent(engine, data)
To Event Output Queue
emitEvent
An event sent by emitEvent asychronously goes to the end of the
output queue. When multiple event stream serializers are specified in the CEP
engine, emitEvent must specify the target serializer via the
outputName parameter, which indicates the name of the
StreamEventSerializer. The CEP engine matches the serializer accordingly and
outputs the event to the corresponding one or more stream tables.
Syntax
emitEvent(event, [eventTimeField], [outputName])
Parameters
event is an event instance.
eventTimeField is a STRING scalar indicating the time field of the event. To specify this parameter, event must contain a time field. If useSystem is set to true when creating the engine, the output events will be timestamped with the system time. If false, it will be output with the latest event time.
StreamEventSerializer.- If only one output table is specified in the CEP engine, this parameter is not required.
- If multiple output tables are specified, outputName must be specified so that the engine can route the event to the corresponding serializer.
Examples
This example demonstrates how to specify different event stream serializers via the outputName parameter. The CEP engine matches the specified serializer and outputs events to the corresponding streamtables.
class MarketData{
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def MarketData(m,c,p,q){
market = m
code = c
price = p
qty = q
}
}
class Orders{
trader :: STRING
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def Orders(t, m,c,p,q){
trader = t
market = m
code = c
price = p
qty = q
}
}
class Trades{
trader :: STRING
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def Trades(t, m,c,p,q){
trader = t
market = m
code = c
price = p
qty = q
}
}
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as MarketDataChannel
serializer1 = streamEventSerializer(name=`MarketDataChannel, eventSchema=[MarketData], outputTable=MarketDataChannel)
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as OrdersChannel
serializer2 = streamEventSerializer(name=`OrdersChannel, eventSchema=[Orders], outputTable=OrdersChannel)
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as TradesChannel
serializer3 = streamEventSerializer(name=`TradesChannel, eventSchema=[Trades], outputTable=TradesChannel)
class SimpleShareSearch:CEPMonitor {
// Constructor
def SimpleShareSearch(){
}
// Specify the name of the event stream serializer to which the event should be sent via emitEvent
def processMarketData(event){
emitEvent(event,,"MarketDataChannel")
}
def processOrders(event){
emitEvent(event,,"OrdersChannel")
}
def processTrades(event){
emitEvent(event,,"TradesChannel")
}
// After the CEP sub-engine is created, the system automatically constructs a SimpleShareSearch object as a Monitor instance and invokes the onload function
def onload() {
addEventListener(handler=processMarketData, eventType="MarketData", times="all")
addEventListener(handler=processOrders, eventType="Orders", times="all")
addEventListener(handler=processTrades, eventType="Trades", times="all")
}
}
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
// Create a CEP engine and specify three event stream serializers
engine = createCEPEngine(name='cep1', monitors=<SimpleShareSearch()>, dummyTable=dummy, eventSchema=[MarketData,Orders,Trades], outputTable=[serializer1,serializer2,serializer3])
m= MarketData("m", "c", 10.0, 100)
appendEvent(engine, m)
o = Orders("a","m", "c", 10.0, 100)
t = Trades("a","m", "c", 10.0, 100)
appendEvent(engine, o)
appendEvent(engine, t)
