Use Cases

Developing operators for reactive state engine

The OOP paradigm simplifies the development process of operators. Traditional methods require complex higher-order functions such as genericIterateState or plugins in C++. Introducing OOP makes the operator code more intuitive and understandable.

class MyCumSum {
  sum :: DOUBLE
  def MyCumSum() {
    sum = 0.0
  }
  def append(value) {
    sum = sum + value
    return sum
  }
}
metrics = [<MyCumSum().append(val)>]
inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
rse = createReactiveStateEngine(
          name="reactiveDemo",
          metrics =metrics,
          dummyTable=inputTable,
          outputTable=result,
          keyColumn="sym")
rse.append!(table(rand(`A`B`C, 100) as sym, rand(100.0, 100) as val))
select TOP 10 * from result

Returns:

sym res
B 23.136844485998154
C 97.25657706148922
C 119.75046650040895
B 28.15541410818696
C 208.6210786132142
A 17.531023011542857
B 127.66627178061754
C 286.6357475752011
A 45.87050362024456
C 343.2471259729937

Parameters can be specified for the constructor of a class, which can only be constants. For example:

dropStreamEngine("reactiveDemo")
class MyCumSum {
  sum :: DOUBLE
  def MyCumSum(initialValue) {
    sum = initialValue
  }
  def append(value) {
    sum = sum + value
    return sum
  }
}
metrics= [<MyCumSum(0.0).append(val)>]
inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
rse = createReactiveStateEngine(
          name="reactiveDemo",
          metrics =metrics,
          dummyTable=inputTable,
          outputTable=result,
          keyColumn="sym")
rse.append!(table(rand(`A`B`C, 100) as sym, rand(100.0, 100) as val))
select TOP 10 * from result

Returns:

sym res
C 79.73966575227678
A 50.2432547044009
C 108.56282587628812
A 137.95565224718302
C 179.22195203136653
B 41.10089084133506
C 253.5685545997694
A 173.4938955400139
C 262.32025355566293
B 105.45554195996374

Parameters of append can be constants or lambda expressions.

class MyCumSum {
  sum :: DOUBLE
  def MyCumSum() {
    g = def (): 0
    sum = g()
  }
  def append(value, f) {
    sum = f(sum, value)
    return sum
  }
}
metrics = [<MyCumSum().append(val, def (a, b): a + b)>]
inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
result2 = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
try {
    dropStreamEngine("reactiveDemo")
    dropStreamEngine("reactiveDemo2")
} catch (exp) {
}
rse = createReactiveStateEngine(
          name="reactiveDemo",
          metrics =metrics,
          dummyTable=inputTable,
          outputTable=result,
          keyColumn="sym")
rse2 = createReactiveStateEngine(
          name="reactiveDemo2",
          metrics =[<cumsum(val)>],
          dummyTable=inputTable,
          outputTable=result2,
          keyColumn="sym")
t = table(rand(`A`B`C, 100) as sym, rand(100.0, 100) as val)
rse.append!(t)
rse2.append!(t)
t1 = select TOP 10 * from result ORDER BY sym
t2 = select TOP 10 * from result2 ORDER BY sym

table t1:

sym res
A 73.74901322182268
A 151.10119895543903
A 234.97340406756848
A 249.01866489090025
A 263.8336496660486
A 277.98473422881216
A 321.98982320260257
A 394.077792391181
A 442.3658183310181
A 538.974173902534

table t2:

sym res
A 73.74901322182268
A 151.10119895543903
A 234.97340406756848
A 249.01866489090025
A 263.8336496660486
A 277.98473422881216
A 321.98982320260257
A 394.077792391181
A 442.3658183310181
A 538.974173902534

Defining events in CEP engine

Using OOP to construct event types can replace the complex construction using dictionary forms. This approach makes the definition and processing logic of events more intuitive, further improving the efficiency of developers in writing CEP applications.

class orders{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    eventTime :: TIMESTAMP
    def orders(t, m, c, p, q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
        eventTime = now()
    }
}
class traderOnline{
    trader :: STRING
    eventTime :: TIMESTAMP
    def traderOnline(t){
        trader = t
        eventTime = now()
    }
}
class mainMonitor{
    ordersTable :: ANY
    isBusy :: BOOL
    def mainMonitor(busyFlag){
        ordersTable = array(ANY, 0)
        isBusy = busyFlag
    }
    def updateOrders(event)
    def unOnload(){
        undef('traderDV', SHARED)
    }
    def online(online){
        getDataViewEngine(,'traderDV').append!(table(online.trader as trader, 0 as orderCount, 0 as tradeCount, 0.0 as price, false as busy, date(online.eventTime) as orderDate))
    }
    def onload(){
        addEventListener(online,'traderOnline',,'all') 
        addEventListener(updateOrders, 'orders',,'all') 
        traderDV = streamTable(array(STRING, 0) as trader,  array(INT, 0) as orderCount, array(INT, 0) as tradeCount, array(DOUBLE, 0) as price, array(BOOL, 0) as busy, array(DATE, 0) as orderDate, array(TIMESTAMP, 0) as updateTime)
        share(traderDV, 'traderDV')
        createDataViewEngine('traderDV', objByName('traderDV'), `trader, `updateTime, true)
    }
    def updateOrders(event) {
        ordersTable.append!([event.trader, event.market, event.code, event.price, event.qty])
        updateDataViewItems('traderDV', event.trader, ['orderCount', 'busy'], [ordersTable.size(), false])
    }
}
dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engineCep = createCEPEngine('cep1', <mainMonitor(true)>, dummy, [orders, traderOnline], 1, 'eventTime', 10000)
traderOnline = traderOnline('t1')
appendEvent(engineCep, traderOnline)
traderOnline = traderOnline('t2')
appendEvent(engineCep, traderOnline)
traderOnline = traderOnline('t3')
appendEvent(engineCep, traderOnline)
orders1 = orders('t1', 'sz', 's001', 11.0, 10)
appendEvent(engineCep, orders1)
objByName(`traderDV)
getCEPEngineStat(`cep1)
/* output:
streamEngineStat->{
}
subEngineStat->
subEngineName eventsOnInp...monitorNumber listeners timers eventsRouted eventsSent eventsReceived...
------------- ------------- ---------------- --------- ------ ------------ ---------- ------------- 
cep1          4             1             2         0      0            0          4             ...
engineStat->{
eventsOnOutputQueue->0
lastErrorTimestamp->
numOfSubEngine->1
lastErrorMessage->
eventsEmitted->0
eventsReceived->4
name->cep1
queueDepth->10000
user->guest
useSystemTime->true
status->OK
}
eventSchema->
eventType    eventField                fieldType                 fieldTypeId        fieldFormId  
------------ ------------------------- ---------------------------- --------------------- -------------
orders       trader,market,code,pric...STRING,STRING,STRING,DO...[18,18,18,16,4,12] [0,0,0,0,0,0]
traderOnline trader,eventTime          STRING,TIMESTAMP          [18,12]            [0,0]        
dataViewEngines->
name     user  status lastErrorMes...lastErrorTim...keyColumns outputTableNameuseSystemTime throttle ...
-------- ----- ------ -------------- ----------------- ------------- -------------- ------------- -------- 
traderDV guest OK                                   trader     traderDV       true                   ...
*/