def createResultTable(){ return table( array(SYMBOL, 0) as SecurityID, array(DATETIME, 0) as TradeTime, array(DOUBLE, 0) as smallBuyOrderAmount, array(DOUBLE, 0) as smallSellOrderAmount, array(DOUBLE, 0) as totalOrderAmount, array(DOUBLE, 0) as factor) } def createTradeSchema(){ return table( array(SYMBOL, 0) as SecurityID, array(INT, 0) as BuyNo, array(INT, 0) as SellNo, array(DATETIME, 0) as TradeTime, array(DOUBLE, 0) as TradeAmount) } def createResult1Schema() { return table( array(INT, 0) as BuyNo, array(SYMBOL, 0) as SecurityID, array(INT, 0) as SellNo, array(DATETIME, 0) as TradeTime, array(DOUBLE, 0) as TradeAmount, array(DOUBLE, 0) as BuyCumAmount, array(DOUBLE, 0) as PrevBuyCumAmount, array(INT, 0) as BuyOrderFlag, array(INT, 0) as PrevBuyOrderFlag) } def createResult2Schema() { return table( array(INT, 0) as SellNo, array(INT, 0) as BuyNo, array(SYMBOL, 0) as SecurityID, array(DATETIME, 0) as TradeTime, array(DOUBLE, 0) as TradeAmount, array(DOUBLE, 0) as BuyCumAmount, array(DOUBLE, 0) as PrevBuyCumAmount, array(INT, 0) as BuyOrderFlag, array(INT, 0) as PrevBuyOrderFlag, array(DOUBLE, 0) as SellCumAmount, array(DOUBLE, 0) as PrevSellCumAmount, array(INT, 0) as SellOrderFlag, array(INT, 0) as PrevSellOrderFlag) } def cleanStreamEngines(engineNames){ for(name in engineNames){ try{ dropStreamEngine(name) } catch(ex){} } } @state def factorOrderCumAmount(tradeAmount){ cumsumTradeAmount = cumsum(tradeAmount) prevCumsumTradeAmount = prev(cumsumTradeAmount) orderFlag = iif(cumsumTradeAmount<100000, 0, 1) prevOrderFlag = prev(orderFlag) return cumsumTradeAmount, prevCumsumTradeAmount, orderFlag, prevOrderFlag } @state def factorSmallOrderNetAmountRatio(tradeAmount, sellCumAmount, sellOrderFlag, prevSellCumAmount, prevSellOrderFlag, buyCumAmount, buyOrderFlag, prevBuyCumAmount, prevBuyOrderFlag){ cumsumTradeAmount = cumsum(tradeAmount) smallSellCumAmount, bigSellCumAmount = dynamicGroupCumsum(sellCumAmount, prevSellCumAmount, sellOrderFlag, prevSellOrderFlag, 2) smallBuyCumAmount, bigBuyCumAmount = dynamicGroupCumsum(buyCumAmount, prevBuyCumAmount, buyOrderFlag, prevBuyOrderFlag, 2) f = (smallBuyCumAmount - smallSellCumAmount) \ cumsumTradeAmount return smallBuyCumAmount, smallSellCumAmount, cumsumTradeAmount, f } def createStreamEngine(result){ tradeSchema = createTradeSchema() result1Schema = createResult1Schema() result2Schema = createResult2Schema() engineNames = ["rse1", "rse2", "rse3"] cleanStreamEngines(engineNames) metrics3 = <[TradeTime, factorSmallOrderNetAmountRatio(tradeAmount, sellCumAmount, sellOrderFlag, prevSellCumAmount, prevSellOrderFlag, buyCumAmount, buyOrderFlag, prevBuyCumAmount, prevBuyOrderFlag)]> rse3 = createReactiveStateEngine(name=engineNames[2], metrics=metrics3, dummyTable=result2Schema, outputTable=result, keyColumn="SecurityID") metrics2 = <[BuyNo, SecurityID, TradeTime, TradeAmount, BuyCumAmount, PrevBuyCumAmount, BuyOrderFlag, PrevBuyOrderFlag, factorOrderCumAmount(TradeAmount)]> rse2 = createReactiveStateEngine(name=engineNames[1], metrics=metrics2, dummyTable=result1Schema, outputTable=rse3, keyColumn="SellNo") metrics1 = <[SecurityID, SellNo, TradeTime, TradeAmount, factorOrderCumAmount(TradeAmount)]> return createReactiveStateEngine(name=engineNames[0], metrics=metrics1, dummyTable=tradeSchema, outputTable=rse2, keyColumn="BuyNo") } result = createResultTable() rse = createStreamEngine(result) insert into rse values(`000155, 1000, 1001, 2020.01.01T09:30:00, 20000) insert into rse values(`000155, 1000, 1002, 2020.01.01T09:30:01, 40000) insert into rse values(`000155, 1000, 1003, 2020.01.01T09:30:02, 60000) insert into rse values(`000155, 1004, 1003, 2020.01.01T09:30:03, 30000) select * from result /* SecurityID TradeTime smallBuyOrderAmount smallSellOrderAmount totalOrderAmount factor ---------- ------------------- ------------------- -------------------- ---------------- ------ 000155 2020.01.01T09:30:00 20000 20000 20000 0 000155 2020.01.01T09:30:01 60000 60000 60000 0 000155 2020.01.01T09:30:02 0 120000 120000 -1 000155 2020.01.01T09:30:03 30000 150000 150000 -0.8 */