Routing Events

Events can be added outside the CEP engine context or routed within a CEP engine. Within a CEP engine context, the monitor instance serves as the control center for event processing, defining how events are handled and where they should be routed. The source monitor instance has the capability to route an event either to the event input queue for further matching with another monitor instance or directly to the event output queue. Additionally, events can stay at the event matcher stage and continue being matched against newly defined listeners (see Defining Event Listeners).

To Event Input Queue

DolphinDB provides two functions, routeEvent and sendEvent, to route events to event input queue and achieve communication among monitor instances.

If you need to append events from outside the CEP engine context, the appendEvent function can be used, which enables you to inject events into the CEP engine's event input queue from an external context or application.

routeEvent

A routed event goes to the front of the input queue. The engine processes all routed events before it processes the next non-routed event on the input queue.

Syntax

routeEvent(event)

Parameters

event is an event instance.

sendEvent

An event sent by sendEvent goes to the end of the input queue. The engine processes events in the order they are sent to the input queue.

Syntax

sendEvent(event)

Parameters

event is an event instance.

appendEvent

The appendEvent function allows direct event appending to event input queue, avoiding serialization and deserialization compared to writing data through heterogeneous tables.

Syntax

appendEvent(engine, events)

Parameters

engine is the engine object returned by createCEPEngine or streamEventSerializer.

events is a class object of the event instance or a dictionary. If it is a dictionary, the event instances will be automatically constructed with the key-value pairs provided. The keys of the dictionary must include the event type (specified with eventSchema) and all of its event fields.

Examples

Example 1. Append Orders events

// define an event Orders
class Orders{
    eventTime :: TIMESTAMP 
    sym :: STRING
    val0 :: INT
    val1 :: FLOAT
    val2 :: DOUBLE
    def Orders(s,v0,v1,v2){
        sym = s
        val0 = v0
        val1 = v1
        val2 = v2
        eventTime = now()
    }
}

// create a CEP engine
engine=createCEPEngine(name="test_CEP",monitors=<Monitor1()>,dummyTable=dummy,eventSchema=[Orders,Change],timeColumn='eventTime')

// if events is a class instance
appendEvent(`test_CEP,Orders("b"+lpad(string(i),3,"0"),i,i*1,i*10))
// if events is a dictionary
d=dict(['eventType',"sym", "val0","val1", "val2"],["Orders",'a000',5,float(3.6),double(5.3)])
appendEvent(`test_CEP,d)

Example 2. Specify event time with eventTimeField for events appended with appendEvent.

class Test{
    key :: string
    et :: TIMESTAMP
    def Test(c){
        key = c
        et = now()   
    }
}

class MarketData{
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    et :: TIMESTAMP
    def MarketData(m,c,p,q){
        market = m
        code = c
        price = p
        qty = q
        et = now()   
  }
}   

class MainMonitor:CEPMonitor{
    streamMinuteBar_1min :: ANY // 1-min OHLC bar
    tsAggrOHLC :: ANY // time-series engine
    def MainMonitor(){
        colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
        colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
        streamMinuteBar_1min = table(10000:0,colNames, colTypes)
    }

    def updateMarketData(event)
    // monitor market data, and calculate 1-min OHLC bars with time-series engine
    def onload(){
        addEventListener(updateMarketData,'MarketData',,'all')
        colNames=["symbol","time","price","type","volume"]
        colTypes=[SYMBOL, TIMESTAMP, DOUBLE, STRING, INT]
        dummy = table(10000:0,colNames,colTypes) 
        colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
        colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
        output = table(10000:0,colNames, colTypes)
        tsAggrOHLC = createTimeSeriesEngine(name="tsAggrOHLC", windowSize=60000, step=60000, metrics=<[first(price) as open ,max(price) as max,min(price) as min ,last(price) as close ,sum(volume) as volume ,wsum(volume, price) as amount ,(last(price)-first(price)/first(price)) as ret, (wsum(volume, price)/sum(volume)) as vwap]>, dummyTable=dummy, outputTable=streamMinuteBar_1min, timeColumn='time', useSystemTime=false, keyColumn="symbol", fill=`none)
    }

    def updateMarketData(event){
        tsAggrOHLC.append!(table(event.code as symbol, event.et as time, event.price as price, event.market as type, event.qty as volume))
    }
}
dummy = table(array(TIMESTAMP, 0) as eventTime,array(STRING, 0) as eventType,  array(BLOB, 0) as blobs)
engine = createCEPEngine(name='cep1', monitors=<MainMonitor()>, dummyTable=dummy, eventSchema=[MarketData,Test],dispatchKey='key',eventTimeField='et',useSystemTime=false,timeColumn="eventTime") 

data = Test("hk")
// append events to CEP engine
appendEvent(engine, data)

data = MarketData("hk","e", 1.0, 1)

// append events to CEP engine
appendEvent(engine, data)

To Event Output Queue

emitEvent

An event sent by emitEvent asychronously goes to the end of the output queue. When multiple event stream serializers are specified in the CEP engine, emitEvent must specify the target serializer via the outputName parameter, which indicates the name of the StreamEventSerializer. The CEP engine matches the serializer accordingly and outputs the event to the corresponding one or more stream tables.

Syntax

emitEvent(event, [eventTimeField], [outputName])

Parameters

event is an event instance.

eventTimeField is a STRING scalar indicating the time field of the event. To specify this parameter, event must contain a time field. If useSystem is set to true when creating the engine, the output events will be timestamped with the system time. If false, it will be output with the latest event time.

outputName(optional) is a STRING scalar specifying the name of the StreamEventSerializer.
  • If only one output table is specified in the CEP engine, this parameter is not required.
  • If multiple output tables are specified, outputName must be specified so that the engine can route the event to the corresponding serializer.

Examples

This example demonstrates how to specify different event stream serializers via the outputName parameter. The CEP engine matches the specified serializer and outputs events to the corresponding streamtables.

class MarketData{
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def MarketData(m,c,p,q){
        market = m
        code = c
        price = p
        qty = q
    }
}

class Orders{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def Orders(t, m,c,p,q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
    }
}

class Trades{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def Trades(t, m,c,p,q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
    }
}

share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as MarketDataChannel
serializer1 = streamEventSerializer(name=`MarketDataChannel, eventSchema=[MarketData], outputTable=MarketDataChannel)

share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as OrdersChannel
serializer2 = streamEventSerializer(name=`OrdersChannel, eventSchema=[Orders], outputTable=OrdersChannel)

share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as TradesChannel
serializer3 = streamEventSerializer(name=`TradesChannel, eventSchema=[Trades], outputTable=TradesChannel)

class SimpleShareSearch:CEPMonitor {
	// Constructor
	def SimpleShareSearch(){
	}
    // Specify the name of the event stream serializer to which the event should be sent via emitEvent
	def processMarketData(event){
        emitEvent(event,,"MarketDataChannel")
    }
    def processOrders(event){
        emitEvent(event,,"OrdersChannel")
    }
    def processTrades(event){
        emitEvent(event,,"TradesChannel")
    }
	// After the CEP sub-engine is created, the system automatically constructs a SimpleShareSearch object as a Monitor instance and invokes the onload function
	def onload() {

		addEventListener(handler=processMarketData, eventType="MarketData", times="all")
		addEventListener(handler=processOrders, eventType="Orders", times="all")
		addEventListener(handler=processTrades, eventType="Trades", times="all")
	}
}

dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)

// Create a CEP engine and specify three event stream serializers
engine = createCEPEngine(name='cep1', monitors=<SimpleShareSearch()>, dummyTable=dummy, eventSchema=[MarketData,Orders,Trades], outputTable=[serializer1,serializer2,serializer3])

m= MarketData("m", "c", 10.0, 100)

appendEvent(engine, m)

o = Orders("a","m", "c", 10.0, 100)
t = Trades("a","m", "c", 10.0, 100)

appendEvent(engine, o)
appendEvent(engine, t)