/**
calTradeCost_lookUpJoin.txt
Script to use look up join engine to calculate trade cost
DolphinDB Inc.
DolphinDB server version: 2.00.6 2022.05.31
Storage engine: TSDB
Last modification time: 2022.07.07
*/

//login account
login("admin", "123456")

def createStreamTableFunc(){
	//create stream table: messageStream
	colName = `msgTime`msgType`msgBody
	colType = [TIMESTAMP,SYMBOL, BLOB]
	messageTemp = streamTable(5000000:0, colName, colType)
	enableTableShareAndPersistence(table=messageTemp, tableName="messageStream", asynWrite=true, compress=true, cacheSize=5000000, retentionMinutes=1440, flushMode=0, preCache=10000)
	messageTemp = NULL
	//create stream table: prevailingQuotes
	colName = `SecurityID`tradeTime`Price`TradeQty`BidPX1`OfferPX1`Spread`snapshotTime
	colType = [SYMBOL, TIME, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, TIME]
	prevailingQuotesTemp = streamTable(100000:0, colName, colType)
	enableTableShareAndPersistence(table=prevailingQuotesTemp, tableName="prevailingQuotes", asynWrite=true, compress=true, cacheSize=100000, retentionMinutes=1440, flushMode=0, preCache=10000)
	prevailingQuotesTemp = NULL
}
createStreamTableFunc()
go

//get table schema
def createSchemaTable(dbName, tableName){
	schema = loadTable(dbName, tableName).schema().colDefs
	return table(1:0, schema.name, schema.typeString)
}
tradeSchema = createSchemaTable("dfs://trade", "trade") 
snapshotSchema = createSchemaTable("dfs://snapshot", "snapshot") 

// register look up join stream computing engine
joinEngine = createLookupJoinEngine(name="tradeJoinSnapshot", leftTable=tradeSchema, rightTable=snapshotSchema, outputTable=prevailingQuotes, metrics=<[tradeSchema.Time, Price, TradeQty, BidPX1, OfferPX1, abs(Price-(BidPX1+OfferPX1)/2), snapshotSchema.Time]>, matchingColumn=`SecurityID)

def appendLeftStream(msg){
	tempMsg = select * from msg where Price > 0 and Time>=09:30:00.000
	getLeftStream(getStreamEngine(`tradeJoinSnapshot)).tableInsert(tempMsg)
}

//register filter stream computing engine and subscribe the stream tables
def filterAndParseStreamFunc(tradeSchema, snapshotSchema){
	filter1 = dict(STRING,ANY)
	filter1["condition"] = "trade"
	filter1["handler"] = appendLeftStream
	filter2 = dict(STRING,ANY)
	filter2["condition"] = "snapshot"
	filter2["handler"] = getRightStream(getStreamEngine(`tradeJoinSnapshot))
	schema = dict(["trade", "snapshot"], [tradeSchema, snapshotSchema])
	engine = streamFilter(name="streamFilter", dummyTable=messageStream, filter=[filter1, filter2], msgSchema=schema)
	
	subscribeTable(tableName="messageStream", actionName="tradeJoinSnapshot", offset=-1, handler=engine, msgAsTable=true, reconnect=true)
}
filterAndParseStreamFunc(tradeSchema, snapshotSchema)

//replay history data
def replayStockMarketData(){
	timeRS = cutPoints(09:15:00.000..15:00:00.000, 100)
	tradeDS = replayDS(sqlObj=<select * from loadTable("dfs://trade", "trade") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS)
	snapshotDS = replayDS(sqlObj=<select * from loadTable("dfs://snapshot", "snapshot") where Date =2020.12.31>, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS)
	inputDict = dict(["trade", "snapshot"], [tradeDS, snapshotDS])
	
	submitJob("replay", "replay for factor calculation", replay, inputDict, messageStream, `Date, `Time, 100000, true, 2)
}
replayStockMarketData()

//getRecentJobs()
//cancelJob("your jobId")
//select * from prevailingQuotes limit 100