/**
createStreamTB.txt
Script to create stream tables for publishing
DolphinDB Inc.
DolphinDB server version: 1.30.18 2022.05.09/2.00.6 2022.05.09
Last modification time: 2022.05.31
*/

// clean up environment
def cleanEnvironment(parallel){
	for(i in 1..parallel){
		try { unsubscribeTable(tableName="tradeOriginalStream", actionName="processBuyOrder" + string(i)) } catch(ex) { print(ex) }
		try { dropStreamEngine("processBuyOrder" + string(i)) } catch(ex) { print(ex) }
		try { dropStreamEngine("processSellOrder" + string(i)) } catch(ex) { print(ex) }
		try { dropStreamEngine("processCapitalFlow" + string(i)) } catch(ex) { print(ex) }
	}
	try { unsubscribeTable(tableName="capitalFlowStream", actionName="processCapitalFlow60min") } catch(ex) { print(ex) }
	try { dropStreamEngine("processCapitalFlow60min") } catch(ex) { print(ex) }
	try{ dropStreamTable(`tradeOriginalStream) } catch(ex){ print(ex) }
	try{ dropStreamTable(`capitalFlowStream) } catch(ex){ print(ex) }
	try{ dropStreamTable(`capitalFlowStream60min) } catch(ex){ print(ex) }
	undef all
}
//calculation parallel, developers need to modify according to the development environment
parallel = 3
cleanEnvironment(parallel)

def createStreamTableFunc(){
	//create stream table: tradeOriginalStream
	colName = `SecurityID`Market`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum
	colType = [SYMBOL, SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT]
	tradeOriginalStreamTemp = streamTable(20000000:0, colName, colType)
	try{ enableTableShareAndPersistence(table=tradeOriginalStreamTemp, tableName="tradeOriginalStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) }
	undef("tradeOriginalStreamTemp")
	//create stream table: capitalFlow
	colName = `SecurityID`TradeTime`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount
	colType =  [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT]
	capitalFlowStreamTemp = streamTable(20000000:0, colName, colType)
	try{ enableTableShareAndPersistence(table=capitalFlowStreamTemp, tableName="capitalFlowStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) }
	undef("capitalFlowStreamTemp")
	//create stream table: capitalFlowStream60min
	colName = `TradeTime`SecurityID`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount
	colType =  [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT]
	capitalFlowStream60minTemp = streamTable(1000000:0, colName, colType)
	try{ enableTableShareAndPersistence(table=capitalFlowStream60minTemp, tableName="capitalFlowStream60min", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) }
	undef("capitalFlowStreamTemp")
}

createStreamTableFunc()
go
setStreamTableFilterColumn(tradeOriginalStream, `SecurityID)