// Log in and clear cache


// Define all functions:

// Drop stream engines
def cleanStreamEngines(engineNames){
	for(name in engineNames){
		try{
			dropStreamEngine(name)
		}
		catch(ex){}
	}
}

// Create database
def tickCreateDb(dfsPath){
	if(existsDatabase(dfsPath)){
		return 
	}
	dbTime = database('', VALUE, 2020.01.01..2021.01.05);
	dbSymbol = database('', HASH, [SYMBOL, 20])
	dbHandle = database(dfsPath, COMPO, [dbTime, dbSymbol],engine='TSDB')
	return dbHandle
}
// Create table
def tickCreateTable(dfsPath,tableName){
	// Table already exists
	if (existsTable(dfsPath,tableName)){
		/*
		dbTick=database(dfsPath)
		dbTick.dropTable(tableName)
		*/
		return
	}
	schemaTableTick = table(
	array(SYMBOL, 0) as SecurityID,
	array(DATE, 0) as tradingDate,
	array(DATETIME, 0) as TradeTime,
	array(DOUBLE, 0) as TradePrice,
	array(DOUBLE, 0) as TradeQty,
	array(DOUBLE, 0) as TradeAmount,
	array(INT, 0) as BuyNo,
	array(INT, 0) as SellNo)
	dbTick=database(dfsPath)
	tick=createPartitionedTable(dbHandle=dbTick, table=schemaTableTick, tableName=tableName, partitionColumns=`TradeTime`SecurityID, sortColumns=`SecurityID`TradeTime, keepDuplicates=ALL)
}
// Function to create an empty in-memory table based on the target table schema.
def createRamTableAsTargetTable(targetTable){
	targetSchema=targetTable.schema();
	sch =select name, typeString as type from  targetSchema.colDefs
	colName=sch.name
	colType=sch.type
	tickStreamTemp = table(10000:0, colName, colType)
	return tickStreamTemp
}
// Simulate intraday tick data for a single stock on a single trading day.  
def formOneStockOneDay(SecurityID,tradingDate,yesterdayClose){
	openMoment=09:30:00.000
	closeMoment=15:00:00.000
	oneDayTickCount=(closeMoment-openMoment)/1000  // Each stock generates one tick per second throughout the day
	// Column 2: Timestamp, generated within a specified range
	timePartVec=(0..(oneDayTickCount-1))*1000+openMoment
	tradetime=concatDateTime(tradingDate,timePartVec)
	// Column 3: Transaction price, generated within a specified range
	rands=rand(2.0,oneDayTickCount)-1
	coef=rands*0.1
	targetCoef=1+coef
	priceSeries=yesterdayClose*targetCoef
	// Column 4: Transaction volume, sampled from the right half of a binomial distribution (p = 0.5)
	volumeRand=randBinomial(30,0.5,oneDayTickCount)-15    
	volumePool=volumeRand[volumeRand>0] // Only positive value are taken
	volumes=rand(volumePool,oneDayTickCount) // Volume is sampled from a predefined order size pool
	stockVolumes=volumes*100  // Shares = volume × 100  
	//plot(volumes,tradetime,"testing")
	sellSideOrderRefCount=oneDayTickCount/4  //Contains sell order IDs 
	buySideOrderRefCount=sellSideOrderRefCount // Contains buy order IDs 
	allOrderPool=rand(1..oneDayTickCount,oneDayTickCount) // Buy order pool: Odd numbers are assigned to buy orders, even numbers to sell orders
	modes=allOrderPool%2
	oddNumbers=allOrderPool[bool(modes)]
	evenNumbers=allOrderPool[bool(modes==0)]
	buyOrderPool=oddNumbers
	sellOrderPool=evenNumbers
	randBuyNo=rand(buyOrderPool,oneDayTickCount)
	randSellNo=rand(sellOrderPool,oneDayTickCount)
	secVec=array(SYMBOL,oneDayTickCount)
	secVec[:]=SecurityID
	tradingDateVec=array(DATE,oneDayTickCount)
	tradingDateVec[:]=tradingDate
	onedayTable=table(
	secVec as SecurityID,
	tradingDateVec as tradingDate,
	tradetime as TradeTime,
	priceSeries as TradePrice,
	stockVolumes as TradeQty,
	priceSeries*stockVolumes as TradeAmount,
	randBuyNo as BuyNo,
	randSellNo as SellNo)
	return onedayTable
}
// Simulate tick data for a single stock
def makeFakeTickPerStock(SecurityID){
	tickPath="dfs://tick_SH_L2_TSDB"
	fakeTickTableName="tick_SH_L2_TSDB"
	fakeTsdbTable=loadTable(tickPath,fakeTickTableName)
	// Generate global parameters per stock 
	emptyTableInRam=createRamTableAsTargetTable(fakeTsdbTable)
	startPrice=rand(150.0,1)[0]  //Set the initial stock price for the year 
	dateRange=2020.01.01..2020.01.20   // Specify the trading day range for simulated data
	// Daily
	for(tradingDate in dateRange){
		// Column 2: Trading day (user-specified) 
		todayTable=formOneStockOneDay(SecurityID,tradingDate,startPrice)
		append!(emptyTableInRam,todayTable)
		startPrice=last(todayTable.TradePrice)  // Update the closing price for the next day
	}
	append!(fakeTsdbTable,emptyTableInRam)
}

// Tick data simulation function  
def simulateTickData(){
	// Takes a stock ticker as input
	codeNum=string(1..4000)
	testFill=lpad(codeNum,6,"0")
	for(SecurityID in testFill){
		jobName="fill_tick_"+SecurityID
		actionName=SecurityID
		makeFakeTickPerStock(SecurityID)
		submitJob(jobName,actionName,makeFakeTickPerStock,SecurityID)
		print(jobName)
		sleep(300)
	}
}

// Defines factor computation logic
@state
def buyTradeRatio(BuyNo,SellNo,TradeQty){
	resultSeries=cumsum(iif(BuyNo>SellNo,TradeQty,0))\cumsum(TradeQty)
	return resultSeries
}

// Define tables and streaming computation engine 
def createAllStreams(globalTickPath,globalTickTableName){	
	buyTradeRatioResultTable = table(
		array(SYMBOL, 0) as SecurityID,
		array(DATE, 0) as tradingDate,
		array(TIMESTAMP, 0) as TradeTime,
		array(DOUBLE, 0) as factor
		)
	
	tickTableHandle=loadTable(globalTickPath,globalTickTableName)
	tempTable=select top 50 * from tickTableHandle  // Tick stream data source structure follows a temporary small table schema
	
	// Define the tick data stream source 
	inputDS = replayDS(<select * from tickTableHandle where tradingDate<2020.03.01>, `TradeTime, `TradeTime)  // Compute factors over a two-month period  

	engineName="reactiveDemo"
	
	// Create stream engine
	demoEngine = createReactiveStateEngine(name=engineName, metrics=<[TradeTime,buyTradeRatio(BuyNo,SellNo,TradeQty)]>, dummyTable=tempTable, outputTable=buyTradeRatioResultTable, keyColumn=["SecurityID","tradingDate"],keepOrder=true)
	
	// Define job name
	demoJobName="streamingbuyTradeRatioDemoJob"
	
	// Start real-time factor computation
	// Submit a job to execute the computation 
	submitJob(demoJobName,"streamComputationOfBuyTradeRatio",replay,inputDS,demoEngine, `TradeTime, `TradeTime, -1,false, 4)
	
	return buyTradeRatioResultTable
}

login("admin","123456");
clearAllCache();
go;

// Program Configuration
globalTickPath="dfs://tick_SH_L2_TSDB"
globalTickTableName="tick_SH_L2_TSDB"


// Data Source  
// Create the database and tables  
// Create the **tick data foundation database  
tickCreateDb(globalTickPath)
// Create tick table
tickCreateTable(globalTickPath,globalTickTableName)

/* 
Users with their own data sources can directly write tick data into the tick table following the schema defined in tickCreateTable. This section of the code can be skipped.  

For A-share tick data, the uncompressed size is approximately 3 GB  
- Data is partitioned using hash (20 partitions), resulting in each partition being around 150 MB 

This example demonstrates the computation of the buyTradeRatio factor, which calculates the proportion of aggressive buy trades relative to the total trading volume within a day.  
- The computed results are stored in an in-memory table.  
- If users wish to persist results, they can create a partitioned table similar to buyTradeRatioResultTable  
*/
simulateTickData()

// Drop stream engines
cleanStreamEngines(["reactiveDemo"])

// Define factor stream and return result table
buyTradeRatioResultTable=createAllStreams(globalTickPath,globalTickTableName)

// =View result table 
select top 50 * from buyTradeRatioResultTable
