/*
 * This script creates databases and tables for both standard snapshot and array vector snapshot data.  
 * If you have Snapshot data, it can be directly imported. Otherwise, simulated data can be generated and inserted into the tables.  
 * The dataset spans 5 trading days and covers 4,000 stocks.  
 * It contains a total of 94 fields, with raw uncompressed data size approximately 50 GB over 5 days. Data is imported on a per-day basis.  
 * This script creates a database partitioned by VALUE (daily) and hashed by stock (HASH 20).  
 * Each partition is approximately 50 MB before compression.  
 */
 
def createSnapshotDbTable(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	dbTime = database(, VALUE, 2021.01.01..2021.12.31)
	dbSymbol = database(, HASH, [SYMBOL, 20])
	db = database(dbName, COMPO, [dbTime, dbSymbol], , 'TSDB')
	name = `SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`BidOrders0`BidOrders1`BidOrders2`BidOrders3`BidOrders4`BidOrders5`BidOrders6`BidOrders7`BidOrders8`BidOrders9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9`OfferOrders0`OfferOrders1`OfferOrders2`OfferOrders3`OfferOrders4`OfferOrders5`OfferOrders6`OfferOrders7`OfferOrders8`OfferOrders9`NumTrades`IOPV`TotalBidQty`TotalOfferQty`WeightedAvgBidPx`WeightedAvgOfferPx`TotalBidNumber`TotalOfferNumber`BidTradeMaxDuration`OfferTradeMaxDuration`NumBidOrders`NumOfferOrders`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney
	type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE
	tbTemp = table(1:0, name, type)
	db = database(dbName)
	createPartitionedTable(dbHandle=db, table=tbTemp, tableName=tbName, partitionColumns=`TradeTime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns = `SecurityID`TradeTime, keepDuplicates=LAST)
}

def createSnapshotArrayVectorDbTable(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	db1 = database("", VALUE, 2020.01.01..2020.12.31)
	db2 = database("", HASH, [SYMBOL, 20])
	db = database(dbName,  COMPO, [db1,db2], , 'TSDB')
	
	name = `SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders`NumTrades`IOPV`TotalBidQty`TotalOfferQty`WeightedAvgBidPx`WeightedAvgOfferPx`TotalBidNumber`TotalOfferNumber`BidTradeMaxDuration`OfferTradeMaxDuration`NumBidOrders`NumOfferOrders`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney
	type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`SYMBOL`DOUBLE`INT`INT`DOUBLE`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE
	tbTemp = table(1:0, name, type)
	tbTemp.dropColumns!(`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders)
	tbTemp.addColumn(`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders, [DOUBLE[],INT[],INT[],DOUBLE[],INT[],INT[]])
	tbTemp.reorderColumns!(`SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders`NumTrades`IOPV`TotalBidQty`TotalOfferQty`WeightedAvgBidPx`WeightedAvgOfferPx`TotalBidNumber`TotalOfferNumber`BidTradeMaxDuration`OfferTradeMaxDuration`NumBidOrders`NumOfferOrders`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney)
	
	db.createPartitionedTable(tbTemp, tbName, `Tradetime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns = `SecurityID`Tradetime, keepDuplicates=LAST)
}

// Define simulated data  
def genSnapshotOrigin(n){
	tradeDate=table(temporalAdd(2020.01.01,n, "B") as tradeDate)
	tradeMin = table((09:30:00.000+0..2400*3*1000) join (13:00:00.000+0..2400*3*1000) as tradeMin)
	tradetime = select concatDateTime(tradeDate,tradeMin) as tradetime from cj(tradeDate,tradeMin)
	securityid ="sz"+lpad(string(000001..004000), 6, `0)
	tmpTable = cj(table(securityid as securityid),tradetime)
	resTable = select *, rand(100.0,size(tmpTable )) as PreClosePx , rand(100.0,size(tmpTable )) as OpenPx, rand(100.0,size(tmpTable )) as HighPx, rand(100.0,size(tmpTable )) as LowPx , rand(100.0,size(tmpTable )) as LastPx, rand(100,size(tmpTable )) as TotalVolumeTrade, rand(100.0,size(tmpTable )) as TotalValueTrade , rand(`a`b`c,size(tmpTable )) as InstrumentStatus, rand(100.0,size(tmpTable )) as BidPrice0, rand(100.0,size(tmpTable )) as BidPrice1 , rand(100.0,size(tmpTable )) as BidPrice2 , rand(100.0,size(tmpTable )) as BidPrice3, rand(100.0,size(tmpTable )) as BidPrice4, rand(100.0,size(tmpTable )) as BidPrice5, rand(100.0,size(tmpTable )) as BidPrice6, rand(100.0,size(tmpTable )) as BidPrice7, rand(100.0,size(tmpTable )) as BidPrice8, rand(100.0,size(tmpTable )) as BidPrice9, rand(100,size(tmpTable )) as BidOrderQty0, rand(100,size(tmpTable )) as BidOrderQty1, rand(100,size(tmpTable )) as BidOrderQty2, rand(100,size(tmpTable )) as BidOrderQty3, rand(100,size(tmpTable )) as BidOrderQty4, rand(100,size(tmpTable )) as BidOrderQty5, rand(100,size(tmpTable )) as BidOrderQty6, rand(100,size(tmpTable )) as BidOrderQty7,rand(100,size(tmpTable )) as BidOrderQty8, rand(100,size(tmpTable )) as BidOrderQty9, rand(100,size(tmpTable )) as BidOrders0, rand(100,size(tmpTable )) as BidOrders1,rand(100,size(tmpTable )) as BidOrders2,rand(100,size(tmpTable )) as BidOrders3,rand(100,size(tmpTable )) as BidOrders4,rand(100,size(tmpTable )) as BidOrders5,rand(100,size(tmpTable )) as BidOrders6,rand(100,size(tmpTable )) as BidOrders7,rand(100,size(tmpTable )) as BidOrders8,rand(100,size(tmpTable )) as BidOrders9,rand(100.0,size(tmpTable )) as OfferPrice0,rand(100.0,size(tmpTable )) as OfferPrice1,rand(100.0,size(tmpTable )) as OfferPrice2,rand(100.0,size(tmpTable )) as OfferPrice3,rand(100.0,size(tmpTable )) as OfferPrice4,rand(100.0,size(tmpTable )) as OfferPrice5,rand(100.0,size(tmpTable )) as OfferPrice6,rand(100.0,size(tmpTable )) as OfferPrice7,rand(100.0,size(tmpTable )) as OfferPrice8,rand(100.0,size(tmpTable )) as OfferPrice9,rand(100,size(tmpTable )) as OfferOrderQty0,rand(100,size(tmpTable )) as OfferOrderQty1,rand(100,size(tmpTable )) as OfferOrderQty2,rand(100,size(tmpTable )) as OfferOrderQty3,rand(100,size(tmpTable )) as OfferOrderQty4,rand(100,size(tmpTable )) as OfferOrderQty5,rand(100,size(tmpTable )) as OfferOrderQty6,rand(100,size(tmpTable )) as OfferOrderQty7,rand(100,size(tmpTable )) as OfferOrderQty8,rand(100,size(tmpTable )) as OfferOrderQty9,rand(100,size(tmpTable )) as OfferOrders0,rand(100,size(tmpTable )) as OfferOrders1,rand(100,size(tmpTable )) as OfferOrders2,rand(100,size(tmpTable )) as OfferOrders3,rand(100,size(tmpTable )) as OfferOrders4,rand(100,size(tmpTable )) as OfferOrders5,rand(100,size(tmpTable )) as OfferOrders6,rand(100,size(tmpTable )) as OfferOrders7,rand(100,size(tmpTable )) as OfferOrders8,rand(100,size(tmpTable )) as OfferOrders9,rand(100,size(tmpTable )) as NumTrades,rand(100.0,size(tmpTable )) as IOPV,rand(100,size(tmpTable )) as TotalBidQty,rand(100,size(tmpTable )) as TotalOfferQty,rand(100.0,size(tmpTable )) as WeightedAvgBidPx,rand(100.0,size(tmpTable )) as WeightedAvgOfferPx,rand(100,size(tmpTable )) as TotalBidNumber,rand(100,size(tmpTable )) as TotalOfferNumber,rand(100,size(tmpTable )) as BidTradeMaxDuration,rand(100,size(tmpTable )) as OfferTradeMaxDuration,rand(100,size(tmpTable )) as NumBidOrders,rand(100,size(tmpTable )) as NumOfferOrders,rand(100,size(tmpTable )) as WithdrawBuyNumber,rand(100,size(tmpTable )) as WithdrawBuyAmount,rand(100.0,size(tmpTable )) as WithdrawBuyMoney,rand(100,size(tmpTable )) as WithdrawSellNumber,rand(100,size(tmpTable )) as WithdrawSellAmount, rand(100.0,size(tmpTable )) as WithdrawSellMoney, rand(100,size(tmpTable )) as ETFBuyNumber, rand(100,size(tmpTable )) as ETFBuyAmount, rand(100.0,size(tmpTable )) as ETFBuyMoney, rand(100,size(tmpTable )) as ETFSellNumber, rand(100,size(tmpTable )) as ETFSellAmount, rand(100.0,size(tmpTable )) as ETFSellMoney from tmpTable
	
	db = loadTable("dfs://snapshot_SH_L2_TSDB", "snapshot_SH_L2_TSDB")
	db.append!(resTable)	
}

// Since the daily data size is around 10 GB, data is imported sequentially by day instead of using submitJob to control memory usage.  
def writeInSnapshotByDay(numOfdays){
	for (n in 0..(numOfdays-1)){
		genSnapshotOrigin(n)	
	}
}

// Convert data from the snapshot table into array vectors. 
def importDataDaily(d, syms){
	snapshot = loadTable("dfs://snapshot_SH_L2_TSDB", "snapshot_SH_L2_TSDB") 
	for(sym in syms){
		tmp = select  SecurityID,TradeTime,PreClosePx,OpenPx,HighPx,LowPx,LastPx,TotalVolumeTrade,TotalValueTrade,InstrumentStatus,fixedLengthArrayVector(BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9) as BidPrice,fixedLengthArrayVector(BidOrderQty0,BidOrderQty1,BidOrderQty2,BidOrderQty3,BidOrderQty4,BidOrderQty5,BidOrderQty6,BidOrderQty7,BidOrderQty8,BidOrderQty9) as BidOrderQty,fixedLengthArrayVector(BidOrders0,BidOrders1,BidOrders2,BidOrders3,BidOrders4,BidOrders5,BidOrders6,BidOrders7,BidOrders8,BidOrders9) as BidOrders ,fixedLengthArrayVector(OfferPrice0,OfferPrice1,OfferPrice2,OfferPrice3,OfferPrice4,OfferPrice5,OfferPrice6,OfferPrice7,OfferPrice8,OfferPrice9) as OfferPrice,fixedLengthArrayVector(OfferOrderQty0,OfferOrderQty1,OfferOrderQty2,OfferOrderQty3,OfferOrderQty4,OfferOrderQty5,OfferOrderQty6,OfferOrderQty7,OfferOrderQty8,OfferOrderQty9) as OfferOrderQty,fixedLengthArrayVector(OfferOrders0,OfferOrders1,OfferOrders2,OfferOrders3,OfferOrders4,OfferOrders5,OfferOrders6,OfferOrders7,OfferOrders8,OfferOrders9) as OfferOrders,NumTrades,IOPV,TotalBidQty,TotalOfferQty,WeightedAvgBidPx,WeightedAvgOfferPx,TotalBidNumber,TotalOfferNumber,BidTradeMaxDuration,OfferTradeMaxDuration,NumBidOrders,NumOfferOrders,WithdrawBuyNumber,WithdrawBuyAmount,WithdrawBuyMoney,WithdrawSellNumber,WithdrawSellAmount,WithdrawSellMoney,ETFBuyNumber,ETFBuyAmount,ETFBuyMoney,ETFSellNumber,ETFSellAmount,ETFSellMoney from snapshot where SecurityID=sym, date(TradeTime) = d
		if(tmp.size()>0){
			loadTable("dfs://LEVEL2_Snapshot_ArrayVector", "Snap").append!(tmp)
		}
	}
}

def writeInSnapshotArrayVectorByDay(numOfdays){
	for (i in 0..19){
		securityid ="sz"+lpad(string(000001..004000), 6, `0)
		hash_bucket_table = table(securityid,hashBucket(securityid,20) as bucket)
		syms = exec securityid from hash_bucket_table where bucket = i
		for (d in 2020.01.01+0..(numOfdays-1)){
			submitJob("array_bucket"+string(i),"importDataDaily_array"+string(d)+"bucket"+string(i),importDataDaily,d,syms)
		}
	}
}

// Create database and table for snapshot data, partitioned by VALUE (daily) and HASH 20 for stocks.  
createSnapshotDbTable("dfs://snapshot_SH_L2_TSDB", "snapshot_SH_L2_TSDB")

//Create a database and table for array vectors
createSnapshotArrayVectorDbTable("dfs://LEVEL2_Snapshot_ArrayVector", "Snap")

// Generate and insert simulated data for 5 trading days. 
writeInSnapshotByDay(5)
writeInSnapshotArrayVectorByDay(5)