emitEvent

语法

emitEvent(event, [eventTimeField], [outputName])

详情

emitEvent 是 CEP monitor 内部的事件控制接口,用于将事件插入到 CEP 引擎的事件输出队列的队尾。当 CEP 引擎指定了多个事件流序列化器时,emitEvent 必须通过参数 outputName 指定目标序列化器(StreamEventSerializer)的名称,CEP 引擎据此进行匹配,并将事件输出到对应的一个或多个流表中。

参数

event 事件实例。

eventTimeField 字符串标量,表示事件中的时间字段名。若要指定此参数,event 必须包含时间字段。当 useSystemTime=true 时,输出事件中的时间为系统时间;否则,输出最新事件时间。

outputName 可选,字符串标量,指定事件流序列化器的名称。

  • 若 CEP 引擎中仅指定了一个输出表,则无需指定该参数。
  • 若 CEP 引擎指定了多个输出表,必须指定 outputName,引擎将根据名称将事件输出至对应的序列化器。

返回值

无。

例子

下例展示如何通过 outputName 参数,指定不同事件流序列化器,由 CEP 引擎进行匹配,并将事件输出到对应的流数据表中。

class MarketData{
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def MarketData(m,c,p,q){
        market = m
        code = c
        price = p
        qty = q
    }
}

class Orders{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def Orders(t, m,c,p,q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
    }
}

class Trades{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def Trades(t, m,c,p,q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
    }
}

share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as MarketDataChannel
serializer1 = streamEventSerializer(name=`MarketDataChannel, eventSchema=[MarketData], outputTable=MarketDataChannel)

share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as OrdersChannel
serializer2 = streamEventSerializer(name=`OrdersChannel, eventSchema=[Orders], outputTable=OrdersChannel)

share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as TradesChannel
serializer3 = streamEventSerializer(name=`TradesChannel, eventSchema=[Trades], outputTable=TradesChannel)

class SimpleShareSearch:CEPMonitor {
	//构造函数
	def SimpleShareSearch(){
	}
    // 通过 emitEvent 指定事件流需要发送的序列化器的名称
	def processMarketData(event){
        emitEvent(event,,"MarketDataChannel")
    }
    def processOrders(event){
        emitEvent(event,,"OrdersChannel")
    }
    def processTrades(event){
        emitEvent(event,,"TradesChannel")
    }
	// 创建 CEP 子引擎之后,系统会自动构造 SimpleShareSearch 类对象为 Monitor 实例并调用 onload 函数
	def onload() {
		//监听StockTick事件
		addEventListener(handler=processMarketData, eventType="MarketData", times="all")
		addEventListener(handler=processOrders, eventType="Orders", times="all")
		addEventListener(handler=processTrades, eventType="Trades", times="all")
	}
}

dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)

// 创建 CEP 引擎,指定3个事件流序列化器
engine = createCEPEngine(name='cep1', monitors=<SimpleShareSearch()>, dummyTable=dummy, eventSchema=[MarketData,Orders,Trades], outputTable=[serializer1,serializer2,serializer3])

m= MarketData("m", "c", 10.0, 100)

appendEvent(engine, m)

o = Orders("a","m", "c", 10.0, 100)
t = Trades("a","m", "c", 10.0, 100)

appendEvent(engine, o)
appendEvent(engine, t)

相关函数addEventListener, createCEPEngine, appendEvent, routeEvent, sendEvent