// Define stock tick events
class StockTick {
	securityid :: STRING 
	time :: TIMESTAMP
	price ::  DOUBLE
	volume :: INT
	def StockTick(securityid_, time_, price_, volume_) {
		securityid = securityid_
		time = time_
		price = price_
		volume = volume_
	}
}
// Define execution report events
class ExecutionReport { 
	orderid :: STRING 
	securityid :: STRING 
	price :: DOUBLE 
	volume :: INT
	def ExecutionReport(orderid_, securityid_, price_, volume_) {
		orderid = orderid_
		securityid = securityid_
		price = price_
		volume = volume_
	}
}
// Define new order events
class NewOrder { 
	orderid :: STRING 
	securityid :: STRING 
	price :: DOUBLE 
	volume :: INT
	side :: INT
	type :: INT
	def NewOrder(orderid_, securityid_, price_, volume_, side_, type_) { 
		orderid = orderid_
		securityid = securityid_
		price = price_
		volume = volume_
		side = side_
		type = type_
	}
}
// Define cancel order events
class CancelOrder { 
	orderid :: STRING 
	def CancelOrder(orderid_) {
		orderid = orderid_
	}
}

class StrategyMonitor : CEPMonitor { 
	strategyid :: INT // Strategy ID
	strategyParams :: ANY // Strategy parameters: target and parameter configurations
	dataview :: ANY // Data view monitoring	
	
	def StrategyMonitor(strategyid_, strategyParams_) {
		strategyid = strategyid_
		strategyParams = strategyParams_
	}

	def execReportExceedTimeHandler(orderid, exceedTimeSecurityid){
		emitEvent(CancelOrder(orderid)) // Send cancel order event externally
		timeoutOrderNum = (exec timeoutOrderNum from self.dataview where securityid=exceedTimeSecurityid)[0] + 1
		updateDataViewItems(engine=self.dataview, keys=exceedTimeSecurityid, valueNames=`timeoutOrderNum, newValues=timeoutOrderNum) // Update data view
	}

	def execReportHandler(execReportEvent) {
		executionAmount = (exec executionAmount from self.dataview where securityid=execReportEvent.securityid)[0] + execReportEvent.price*execReportEvent.volume
		executionOrderNum = (exec executionOrderNum from self.dataview where securityid=execReportEvent.securityid)[0] + 1
		updateDataViewItems(engine=self.dataview, keys=execReportEvent.securityid, valueNames=["executionAmount","executionOrderNum"], newValues=(executionAmount,executionOrderNum)  // Update data view
	}
	
	def handleFactorCalOutput(factorResult){
		factorSecurityid = factorResult.securityid[0]
		ROC = factorResult.ROC[0]
		volume = factorResult.volume[0]
		lastPrice = factorResult.lastPrice[0] 
		updateDataViewItems(engine=self.dataview, keys=factorSecurityid, valueNames=["ROC","volume"], newValues=(ROC,volume))
		if (ROC>strategyParams[factorSecurityid][`ROCThreshold] && volume>strategyParams[factorSecurityid][`volumeThreshold]) {
			orderid = self.strategyid+"_"+factorSecurityid+"_"+long(now())
			newOrder = NewOrder(orderid , factorSecurityid, lastPrice*0.98, 100, 'B', 0) // Construct new order event, buying at 2% below the latest price
			emitEvent(newOrder) // Send new order event externally
			newOrderNum = (exec newOrderNum from self.dataview where securityid=factorSecurityid)[0] + 1
			newOrderAmount = (exec newOrderAmount from self.dataview where securityid=factorSecurityid)[0] + lastPrice*0.98*10
			updateDataViewItems(engine=self.dataview, keys=factorSecurityid, valueNames= ["newOrderNum", "newOrderAmount"], newValues=(newOrderNum, newOrderAmount)) // Update data view			
			addEventListener(handler=self.execReportExceedTimeHandler{orderid, factorSecurityid}, eventType="ExecutionReport", condition=<ExecutionReport.orderid=orderid>, times=1, exceedTime=60s) // Start execution report timer
			addEventListener(handler=execReportHandler, eventType="ExecutionReport", condition=<ExecutionReport.orderid=orderid>, times="all") // Start execution report listener
		}
	}

	def tickHandler(tickEvent){
		factorCalEngine = getStreamEngine(`factorCal)
		insert into factorCalEngine values([tickEvent.securityid, tickEvent.time, tickEvent.price, tickEvent.volume])
	}

	def initDataView(){
		// Create data view to monitor strategy execution status
		share(streamTable(1:0, `securityid`strategyid`ROCThreshold`volumeThreshold`ROC`volume`newOrderNum`newOrderAmount`executionOrderNum`executionAmount`timeoutOrderNum`updateTime, `STRING`INT`INT`INT`INT`INT`INT`DOUBLE`INT`DOUBLE`INT`TIMESTAMP), "strategyDV")
		dataview = createDataViewEngine(name="Strategy_"+strategyid, outputTable=objByName(`strategyDV), keyColumns=`securityId, timeColumn=`updateTime) 
		num = strategyParams.size()
		securityids = strategyParams.keys()
		ROCThresholds = each(find{,"ROCThreshold"}, strategyParams.values())
		volumeThresholds = each(find{,"volumeThreshold"}, strategyParams.values()) 
		dataview.tableInsert(table(securityids, take(self.strategyid, num) as strategyid, ROCThresholds, volumeThresholds, take(int(NULL), num) as ROC, take(int(NULL), num) as volume, take(0, num) as newOrderNum, take(0, num) as newOrderAmount, take(0, num) as executionOrderNum, take(0, num) as executionAmount, take(0, num) as timeoutOrderNum))
	}

	def createFactorCalEngine(){
		dummyTable = table(1:0, `securityid`time`price`volume, `STRING`TIMESTAMP`DOUBLE`INT)
		metrics = [<(price\tmmin(time, price, 15s)-1)*100>, <tmsum(time, volume, 60s)>, <price> ] // Latest price relative to the lowest price in the past 15 seconds, cumulative volume in 1 minute, latest price
		factorResult = table(1:0, `securityid`ROC`volume`lastPrice, `STRING`INT`LONG`DOUBLE) 
		createReactiveStateEngine(name="factorCal", metrics=metrics , dummyTable=dummyTable, outputTable=factorResult, keyColumn=`securityid, outputHandler=handleFactorCalOutput, msgAsTable=true)		
	}
	
	def onload() {
		// Initialize data view
		initDataView()
		// Create factor calculation engine
		createFactorCalEngine()
		// Start listening to stock tick events
		securityids = strategyParams.keys()
		addEventListener(handler=tickHandler, eventType="StockTick", condition=<StockTick.securityid in securityids>, times="all")
	}
}

dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as eventBody)
share(streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as eventBody, array(STRING, 0) as orderid), "output")
outputSerializer = streamEventSerializer(name=`serOutput, eventSchema=[NewOrder,CancelOrder], outputTable=objByName("output"), commonField="orderid")
strategyid = 1
strategyParams = dict(`300001`300002`300003, [dict(`ROCThreshold`volumeThreshold, [1,1000]), dict(`ROCThreshold`volumeThreshold, [1,2000]), dict(`ROCThreshold`volumeThreshold, [2, 5000])])
engine = createCEPEngine(name='strategyDemo', monitors=<StrategyMonitor(strategyid, strategyParams)>, dummyTable=dummy, eventSchema=[StockTick,ExecutionReport], outputTable=outputSerializer)

ids = `300001`300002`300003`600100`600800
for (i in 1..120) {
	sleep(500)
	tick = StockTick(rand(ids, 1)[0], now()+1000*i, 10.0+rand(1.0,1)[0], 100*rand(1..10, 1)[0])
	getStreamEngine(`strategyDemo).appendEvent(tick)	
}
sleep(1000*20)
print("begin to append ExecutionReport")
for (orderid in (exec orderid from output where eventType="NewOrder")){
	sleep(250)
	if(not orderid in (exec orderid from output where eventType="CancelOrder")) {
		execRep = ExecutionReport(orderid, split(orderid,"_")[1], 10, 100)
		getStreamEngine(`strategyDemo).appendEvent(execRep)	
	}	
}

go



