login(`admin,`123456)
try{loadPlugin(getHomeDir()+"/plugins/mqtt/PluginMQTTClient.txt")} catch(ex) {print(ex)}
go
use mqtt;

def  clearEnv(){
	try{
		unsubscribeTable( tableName=`inputSt, actionName="monitor")
		if(objs(true).name.find('inputSt')!=-1) 
			dropStreamTable(`inputSt)
		if(objs(true).name.find('outputSt1')!=-1) 
			dropStreamTable(`outputSt1)
		if(objs(true).name.find('outputSt2')!=-1) 
			dropStreamTable(`outputSt2)
		if (getAggregatorStat().ReactiveStreamEngine[`name].find(`reactivEngine)!=-1)
			dropAggregator(`reactivEngine)
		if (getAggregatorStat().SessionWindowEngine[`name].find(`swEngine)!=-1)
			dropAggregator(`swEngine)
		for (id in mqtt::getSubscriberStat()[`subscriptionId]) 
			mqtt::unsubscribe(id)
	}catch(ex){
		print(ex)
	}
	
}


def createInOutTable(){
	stream01=streamTable(100000:0,`tag`ts`value,[SYMBOL,TIMESTAMP, INT])
	enableTableShareAndPersistence(table=stream01,tableName=`inputSt,asynWrite=false,compress=true, cacheSize=100000)
	
	out1 =streamTable(10000:0,`tag`ts`value,[SYMBOL,TIMESTAMP, INT])
	enableTableShareAndPersistence(table=out1,tableName=`outputSt1,asynWrite=false,compress=true, cacheSize=100000)
	
	out2 =streamTable(10000:0,`ts`tag`lastValue,[TIMESTAMP,SYMBOL, INT])
	enableTableShareAndPersistence(table=out2,tableName=`outputSt2,asynWrite=false,compress=true, cacheSize=100000)
}


def process(mutable  engine1,mutable  engine2,msg){
	engine1.append!(msg) 
	engine2.append!(msg)
}
def consume(){
	reactivEngine = createReactiveStateEngine(name=`reactivEngine, metrics=<[ts, value]>, dummyTable=objByName(`inputSt),outputTable= objByName(`outputSt1),keyColumn= "tag",filter=<value!=prev(value) && prev(value)!=NULL>)
	
	swEngine = createSessionWindowEngine(name = "swEngine", sessionGap = 30000, metrics = < last(value)>, dummyTable = objByName(`inputSt), outputTable = objByName(`outputSt2), timeColumn = `ts, keyColumn=`tag,useSessionStartTime=false)
	
	subscribeTable(tableName="inputSt", actionName="monitor", offset=0, 
			handler=process{swEngine,reactivEngine}, msgAsTable=true)
}


def writeStreamTable(host, port, topic){
	//sp = mqtt::createCsvParser([SYMBOL,TIMESTAMP,INT])
	sp = createJsonParser([SYMBOL,TIMESTAMP, INT], `tag`ts`value)
	mqtt::subscribe(host, port, topic, sp, objByName(`inputSt))
}

def publishTableData(server,topic,f, batchsize,t){
    	conn=connect(server,1883,0,f,batchsize)
   	publish(conn,topic,t)
    	close(conn)
}


def main(){
	clearEnv()
	
	createInOutTable()
	consume()
	
	host="127.0.0.1"
	port=1883
	topic="sensor/test"
	writeStreamTable(host, port, topic)

	t=loadText(getHomeDir()+"/deviceState.csv")
	//myFormat=take("", 3)   f= mqtt::createCsvFormatter(myFormat, ',', ';')
	f = createJsonFormatter()
	batchsize=100
	submitJob("submit_pub1", "submit_p1", publishTableData{host,topic,f, batchsize,t})
}

main()


getAggregatorStat().ReactiveStreamEngine
getAggregatorStat().SessionWindowEngine
getStreamingStat().pubTables
getStreamingStat().pubConns
getStreamingStat().subWorkers
mqtt::getSubscriberStat()  
getRecentJobs()

select * from outputSt1 where tag=`motor.C17156B.m1
select * from outputSt2 where tag=`motor.C17156B.m1