def createDatabase(dbName,tableName, ps1, ps2){
	tableSchema = table(1:0,`id`datetime`value,[INT,DATETIME,FLOAT]);
	db1 = database("", VALUE, ps1)
	db2 = database("", RANGE, ps2)
	db = database(dbName,COMPO,[db1,db2])
	dfsTable = db.createPartitionedTable(tableSchema,tableName,`datetime`id)
}

def generate1DayData(day, id, freqPerDay){
	startTime = day.datetime()
	idSize = size(id)
	numRecords = freqPerDay * idSize
	idVec = array(INT, numRecords)
	for(i in 0:idSize) idVec[(i*freqPerDay) : ((i+1)*freqPerDay)] = id[i]
	return table(idVec, take(startTime+0..(freqPerDay-1),numRecords) as datetime, rand(1.0, numRecords) as value)
}

def singleThreadWriting(id, startDay, days, freqPerDay, numIdPerPartition){
	t = loadTable("dfs://svmDemo","sensors")
	for(d in 0:days){
		index=0
		do{
			t.append!(generate1DayData(startDay + d, id[index+0..(numIdPerPartition-1)], freqPerDay))
			index += numIdPerPartition
		}while (index < size(id))
	}
}
def multipleThreadWriting(id, startDay, days, freqPerDay, numIdPerPartition, threads) {
	//split id to multiple part for parallel writing
	idCountPerThread = ceil(id.size()\threads/numIdPerPartition)*numIdPerPartition
	ploop(singleThreadWriting{, startDay, days, freqPerDay, numIdPerPartition}, id.cut(idCountPerThread))
}

def mainJob(id, startDay, days, ps1, ps2, freqPerDay, numIdPerPartition, threads){
    if(existsDatabase("dfs://svmDemo"))
		dropDatabase("dfs://svmDemo")
    createDatabase("dfs://svmDemo","sensors", ps1, ps2)

    if(threads == 1)
    	submitJob("submit_singleThreadWriting", "write data", singleThreadWriting{id, startDay, days, freqPerDay, numIdPerPartition})
    else
    	submitJob("submit_multipleThreadWriting", "write data", multipleThreadWriting{id, startDay, days, freqPerDay, numIdPerPartition, threads})
}


///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

login("admin","123456")

freqPerDay=86400
numMachines=100
numMetrics=50
numMachinesPerPartition=2
numIdPerPartition=numMachinesPerPartition*numMetrics
ps1=2020.09.01..2020.12.31
ps2=(numMetrics*numMachinesPerPartition)*(0..(numMachines/numMachinesPerPartition))+1
id =1..(numMachines*numMetrics)
startDay=2020.09.01
//写入多少天的数据
days = 5
//多少个线程并行写入
threads = 20

mainJob(id, startDay, days, ps1, ps2, freqPerDay, numIdPerPartition, threads)