def createDatabase(dbName,tableName, ps1, ps2, numMetrics){
	m = "tag" + string(1..numMetrics)
	schema = table(1:0,`id`datetime join m, [INT,DATETIME] join take(FLOAT,50) )
	db1 = database("",VALUE,ps1)
	db2 = database("",RANGE,ps2)
	db = database(dbName,COMPO,[db1,db2])
	db.createPartitionedTable(schema,tableName,`datetime`id)
}

def generate1DayData(day,id,freqPerDay,numMetrics){
	startTime = day.datetime()
	idSize = size(id)
	numRecords = freqPerDay * idSize
	idVec = array(INT, numRecords)
	
	for(i in 0:idSize) idVec[(i*freqPerDay) : ((i+1)*freqPerDay)] = id[i]
	t = table(idVec ,take(startTime + (0..(freqPerDay-1)), numRecords) as ts)
	
	m = "tag" + string(1..numMetrics)
	for (i in 0 : numMetrics) t[m[i]] =rand(1.0, numRecords)
	return t
}

def singleThreadWriting(id, startDay, days, freqPerDay, numMachinesPerPartition,numMetrics,dbName,tableName){
	t = loadTable(dbName,tableName)
	idSize=size(id)

	for(d in 0:days){
		index=0
		do{
			idMax= numMachinesPerPartition - 1
			if(idSize - index <= 9) idMax = idSize - index - 1 
			t.append!(generate1DayData(startDay + d,id[index+0..idMax],freqPerDay,numMetrics))
			index +=numMachinesPerPartition
		}while (index < idSize)
	}

}
def multipleThreadWriting(id, startDay, days,freqPerDay, numMachinesPerPartition,numMetrics, threads,dbName,tableName) {
	//split devVec to multiple part for parallel writing
	idCountPerThread = ceil(id.size() \ threads/10)*10
	ploop(singleThreadWriting{,startDay, days,freqPerDay, numMachinesPerPartition,numMetrics,dbName,tableName}, id.cut(idCountPerThread))
}

def mainJob(id,startDay,days, ps1, ps2,freqPerDay, numMachinesPerPartition,threads) {
	dbName="dfs://mvmDemo"
	tableName="machines"
	numMetrics = 50 
	if(existsDatabase(dbName))
		dropDatabase(dbName)

	createDatabase(dbName,tableName, ps1, ps2, numMetrics)
    if(threads == 1)
    	submitJob("submit_singleThreadWriting", "write data", singleThreadWriting{id, startDay, days,freqPerDay, numMachinesPerPartition,numMetrics,dbName,tableName})
    else
    	submitJob("submit_multiThreadWriting", "write data", multipleThreadWriting{id, startDay, days,freqPerDay, numMachinesPerPartition,numMetrics, threads,dbName,tableName})
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
login("admin","123456")

freqPerDay=86400
numMachines=100
id=1..numMachines //设备编号
startDay=2020.09.01//开始日期
days=5 //写几天数据
threads = 20 //多个线程同时写

numMachinesPerPartition=10
ps1=2020.09.01..2020.12.31
ps2=numMachinesPerPartition*(0..(numMachines/numMachinesPerPartition))+1

mainJob(id, startDay, days, ps1, ps2, freqPerDay, numMachinesPerPartition, threads)