Rule Engine

The rule engine is a powerful stream processing tool provided by DolphinDB. Its core feature is “dynamically selecting and applying relevant business rules for logical judgment based on data characteristics.” When data flows into the engine, it selects matching rules from a predefined set based on a key field in the data (such as user ID, product type, transaction channel, etc.) to evaluate the data. The results (whether the data passes the rules) are output to a result table and can also trigger custom callback functions for immediate processing.

Rule engine features:

  • Millisecond-level real-time response

  • Accurate processing of large-scale time-series data

  • Online dynamic rule adjustment

Therefore, the rule engine is suitable for scenarios requiring high flexibility, real-time processing, and configurable business logic, such as:

  • IoT:Device monitoring, e.g., temperature and humidity; power monitoring, e.g., voltage and electricity usage.

  • Finance:Risk control, e.g., filtering orders according to specified rules, monitoring stock trading volumes, setting over-limit warning signals, etc.

The rule engine is created using the createRuleEngine function.

Syntax:

createRuleEngine(name, ruleSets, dummyTable, outputColumns, outputTable, [policy], [ruleSetColumn], [callback], [snapshotDir], [snapshotIntervalInMsgCount])

For more details, see createRuleEngine.

Rules in the engine can be dynamically adjusted using the updateRule and deleteRule functions, and existing rules can be retrieved using getRules:

updateRule(engineName, key, rules, [add=false])
deleteRule(engineName, key)
getRules([engineName])

For more details, see updateRule, deleteRule, and getRules.

Calculation Rules

The rule engine works as follows: first, a set of rules is defined using the ruleSets parameter in the form of a dictionary. The key indicates the rule identifier. If the key is NULL, the rule is treated as the default rule. The ruleSetColumn parameter specifies the field in the input data used to match rules. When the value of this field equals a key in ruleSets, the corresponding rule is considered matched. The engine then generates the calculation result according to the policy parameter. If the callback parameter is specified, the callback logic will also be triggered. If ruleSetColumn is not specified or no rule is matched, the default rule is applied.

Application Examples

Collect the current, voltage, and temperature of power equipment, and configure a rule set for monitoring. The collected data is stored in a stream table. The rule engine subscribes to the stream table to obtain real-time data, evaluates data, and outputs the results to another table.

Implementation steps:

  1. Create a stream table to store the collected data.

    share streamTable(1000:0,`time`pointId`voltage`current`temprature,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE]) as inputTable
  2. Create a stream table for data output by the rule engine and another stream table for data processed by the callback function defined in the rule engine.

    // Create a stream table for data output by the rule engine
    share streamTable(1000:0,`time`pointId`voltage`current`temprature`inputTime`rule,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE,NANOTIMESTAMP,BOOL[]]) as outputTable
    // Create a stream table for data processed by the callback function defined in the rule engine
    share streamTable(1000:0,`time`pointId`voltage`current`temprature`inputTime`outputTime`comment,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE,NANOTIMESTAMP,NANOTIMESTAMP,STRING]) as resultTable
  3. Define the rule set.

    n = 10
    pointId=`ID + string(1..n)
    voltageHigh=round(double(rand(220..230,n))-rand(0.6,n),2)
    voltageLow=round(double(rand(45..50,n)) +rand(0.5,n),2)
    currentHigh=round(double(rand(40..45,n))-rand(0.6,n),2)    
    currentLow=round(double(rand(7..9,n))+rand(0.5,n),2)    
    tempratureHigh=round(double(rand(40..45,n))-rand(0.6,n),2)
    tempratureLow=round(double(rand(7..9,n))+rand(0.5,n),2)
    pt = table(pointId,voltageHigh,voltageLow,currentHigh,currentLow,tempratureHigh,tempratureLow)
    ids = exec pointId from pt
    ruleSet = dict(STRING,ANY)
    for(i in 0:pt.size()){
        tmp = pt[i]
        keys = tmp.keys()[1:]
        value = tmp.values()[1:]
        a = array(STRING)
        for(j in 0:keys.size()){
            if(keys[j] like "%High"){
                s=strpos(keys[j],"High");
                a.append!(keys[j][0:s]+ ">" + value[j] )
            }else{
                s=strpos(keys[j],"Low");
                a.append!(keys[j][0:s] + "<" + value[j])
            }
        }
        ruleSet[ids[i]] = parseExpr(a) 
    }
    ruleSet[string(NULL)] = [ <voltage > 100000>]
  4. Define the callback function.

    def writeBack(result){
        outputTime = now(true)
        if(result.rule[0]==true){
            s="High voltage"
            insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
        }
        if(result.rule[1]==true){
            s="Low voltage"
            insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
        }
        if(result.rule[2]==true){
            s="High current"
            insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
        }
        if(result.rule[3]==true){
            s="Low current"
            insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
        }
        if(result.rule[4]==true){
            s="High temperature"
            insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
        }
        if(result.rule[5]==true){
            s="Low temperature"
            insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
        }
    }
  5. Create the rule engine.

    colNames = inputTable.schema().colDefs.name join `inputTime
    colTypes = inputTable.schema().colDefs.typeString join `NANOTIMESTAMP
    schemaTable = table(1:0,colNames,colTypes)
    ruleEngine=createRuleEngine(
        name="ruleEngine",
        ruleSets=ruleSet,
        dummyTable=schemaTable,
        outputColumns=["time","pointId","voltage","current","temprature","inputTime"],
        outputTable=outputTable, 
        policy="all", 
        ruleSetColumn="pointId",
        callback=writeBack
    )
  6. Subscribe to the stream table and insert 10 simulated records.

    def handle(msg){
        tmp = select *,now(true) as inputTime from msg  
        getStreamEngine("ruleEngine").append!(tmp)
    }
    // Subscribe to the stream table
    subscribeTable(
        tableName="inputTable", 
        actionName="RuleEngine", 
        handler=handle, 
        msgAsTable=true,
        offset=0
    )
    // Generate 10 simulated records
    n =10
    t=table(n:n,`time`pointId`voltage`current`temprature,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE])
    t["time"] = now() +( 1..10) *1000
    t["pointId"]=`ID + string(1..n)
    t["voltage"]=round(double(rand(45..225,n))+rand(0.5,n) ,2)
    t["current"]=round(double(rand(7..43,n))+rand(0.5,n),2)  
    t["temprature"]=round(double(rand(7..43,n))+rand(0.5,n),2) 
    // Insert simulated records into the stream table
    inputTable.append!(t)

View the input table.

time pointId voltage current temprature
2025.11.09 21:09:24.883 ID1 100.15 37.19 22.33
2025.11.09 21:09:25.883 ID2 196.06 11.12 18.32
2025.11.09 21:09:26.883 ID3 81.21 35.07 30.04
2025.11.09 21:09:27.883 ID4 181.4 40.18 40.01
2025.11.09 21:09:28.883 ID5 217.13 42.31 27.18
2025.11.09 21:09:29.883 ID6 182.47 24.18 43.36
2025.11.09 21:09:30.883 ID7 124.44 12.27 43.25
2025.11.09 21:09:31.883 ID8 198.33 39.03 8.33
2025.11.09 21:09:32.883 ID9 78.04 16.04 31.31
2025.11.09 21:09:33.883 ID10 194.28 18.45 35.37

View the rule engine’s output table.

time pointId voltage current temprature inputTime rule
2025.11.09 21:09:24.883 ID1 100.15 37.19 22.33 2025.11.09 21:09:23.886 [false, false, false, false, false, false]
2025.11.09 21:09:25.883 ID2 196.06 11.12 18.32 2025.11.09 21:09:23.886 [false, false, false, false, false, false]
2025.11.09 21:09:26.883 ID3 81.21 35.07 30.04 2025.11.09 21:09:23.886 [false, false, false, false, false, false]
2025.11.09 21:09:27.883 ID4 181.4 40.18 40.01 2025.11.09 21:09:23.886 [false, false, false, false, false, false]
2025.11.09 21:09:28.883 ID5 217.13 42.31 27.18 2025.11.09 21:09:23.886 [false, false, true, false, false, false]
2025.11.09 21:09:29.883 ID6 182.47 24.18 43.36 2025.11.09 21:09:23.886 [false, false, false, false, true, false]
2025.11.09 21:09:30.883 ID7 124.44 12.27 43.25 2025.11.09 21:09:23.886 [false, false, false, false, true, false]
2025.11.09 21:09:31.883 ID8 198.33 39.03 8.33 2025.11.09 21:09:23.886 [false, false, false, false, false, true]
2025.11.09 21:09:32.883 ID9 78.04 16.04 31.31 2025.11.09 21:09:23.886 [false, false, false, false, false, false]
2025.11.09 21:09:33.883 ID10 194.28 18.45 35.37 2025.11.09 21:09:23.886 [false, false, false, false, false, false]

View the output table after callback processing.

time pointId voltage current temprature inputTime outputTime comment
2025.11.09 21:09:28.883 ID5 217.13 42.31 27.18 2025.11.09 21:09:23.886 2025.11.09 21:09:23.886 High current
2025.11.09 21:09:29.883 ID6 182.47 24.18 43.36 2025.11.09 21:09:23.886 2025.11.09 21:09:23.887 High temprature
2025.11.09 21:09:30.883 ID7 124.44 12.27 43.25 2025.11.09 21:09:23.886 2025.11.09 21:09:23.887 High temprature
2025.11.09 21:09:31.883 ID8 198.33 39.03 8.33 2025.11.09 21:09:23.886 2025.11.09 21:09:23.887 Low temprature

Explanation for the results

The rule set for device 'ID5' is:

(< voltage > 221.86 >,< voltage < 45.16 >,< current > 39.96 >,< current < 7.47 >,< temprature > 44.43 >,< temprature < 8.03 >)

The input data matches the rule "current > 39.96." After callback processing, the output is: "High current."

The rule set for device 'ID6' is:

(< voltage > 221.89 >,< voltage < 49.05 >,< current > 43.42 >,< current < 7.06 >,< temprature > 41.98 >,< temprature < 7.418>)

The input data matches the rule "temperature > 41.98." After callback processing, the output is: "High temperature."

The rule set for device 'ID7' is:

(< voltage > 226.61 >,< voltage < 48.40 >,< current > 40.77 >,< current < 9.19 >,< temprature > 41.95 >,< temprature < 8.14 >)

The input data matches the rule "temperature > 41.95." After callback processing, the output is: "High temperature."

The rule set for device 'ID8' is:

(< voltage > 228.97 >,< voltage < 46.137 >,< current > 42.607 >,< current < 7.29 >,< temprature > 44.40 >,< temprature < 9.27 >)

The input data matches the rule "temperature < 9.27." After callback processing, the output is: "High temperature."

Dynamically adjust the rules.

// Adjust the rules
newRules = [ < voltage > 223.41 >,< voltage < 49.25 >,< current > 36.84 >,< current < 8.4 >,< temprature > 44.60 >,< temprature < 7.43 >]
updateRule("ruleEngine","ID1",newRules)
// Insert a simulated record
insert into inputTable values (now(), 'ID1',100.15,37.19,22.33)

In the record, the current is 37.19. Under the previous rules, no alert was triggered. After the update, the "current > 36.84" rule is matched, and a new record is added to the output table.