规则引擎
规则引擎是 DolphinDB 提供的一个强大的流数据处理工具,其核心思想是 “根据数据自身的特征,动态地选择并应用相应的业务规则进行逻辑判断”。数据流进入引擎后,引擎会依据数据中的某个关键字段(如用户 ID、产品类型、交易渠道等),从预先设定好的规则集中挑选出匹配的规则来检查这条数据。检查结果(是否通过规则)会输出到结果表中,并且可以触发自定义的回调函数进行即时处理。
DolphinDB 规则引擎:
-
毫秒级实时响应速度
-
精准处理大规模时序数据
-
在线动态规则调整
因此适用于需要高灵活性、实时性和业务可配置性的场景,如:
-
物联网:对设备进行监测,例如监测设备温度、湿度;电力监控,例如电压,用电量等。
-
金融:风控场景,例如根据指定规则过滤订单,监控股票成交量,设置超量预警信号等。
规则引擎由函数 createRuleEngine 创建。
语法如下:
createRuleEngine(name, ruleSets, dummyTable, outputColumns, outputTable,
[policy], [ruleSetColumn], [callback], [snapshotDir],
[snapshotIntervalInMsgCount])
具体可参考:createRuleEngine。
规则引擎可由函数 updateRule 和函数 deleteRule 动态调整规则,由函数
getRules 获取创建的规则信息。
语法如下:
updateRule(engineName, key, rules, [add=false])
deleteRule(engineName, key)
getRules([engineName])
具体可参考:updateRule,deleteRule,getRules。
计算规则
规则引擎的计算规则是:首先会通过 ruleSets 参数,设定一组规则,字典格式,key 表示规则的键值,为 NULL
时,此条规则为默认规则。ruleSetColumn 设置输入数据中命中规则的字段名,当字段值等于
ruleSet 中的 key 值时表示命中此规则,然后根据 policy
的设置输出引擎的计算结果,此时如果制定了 callback
则同时会触发回调处理逻辑。ruleSetColumn 若没指定或者没有命中规则,那么使用默认规则处理。
应用示例
采样电力设备的电流、电压、和温度,设置规则集进行告警监控。
采集的数据存放到流数据表中,规则引擎通过订阅流数据表来获取实时数据,并进行规则检查,将检查结果输出到另外一个表中。
实现步骤如下:
(1) 定义流表存放采集的数据
share streamTable(1000:0,`time`pointId`voltage`current`temprature,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE]) as inputTable
(2) 定义规则引擎的输出流表以及规则引擎回调函数处理后的流表
// 创建流表 保存规则引擎的输出数据
share streamTable(1000:0,`time`pointId`voltage`current`temprature`inputTime`rule,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE,NANOTIMESTAMP,BOOL[]]) as outputTable
// 创建流表 保存规则引擎回调函数处理后数据
share streamTable(1000:0,`time`pointId`voltage`current`temprature`inputTime`outputTime`comment,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE,NANOTIMESTAMP,NANOTIMESTAMP,STRING]) as resultTable
(3) 定义规则集
// 定义规则集
n = 10
pointId=`ID + string(1..n)
voltageHigh=round(double(rand(220..230,n))-rand(0.6,n),2)
voltageLow=round(double(rand(45..50,n)) +rand(0.5,n),2)
currentHigh=round(double(rand(40..45,n))-rand(0.6,n),2)
currentLow=round(double(rand(7..9,n))+rand(0.5,n),2)
tempratureHigh=round(double(rand(40..45,n))-rand(0.6,n),2)
tempratureLow=round(double(rand(7..9,n))+rand(0.5,n),2)
pt = table(pointId,voltageHigh,voltageLow,currentHigh,currentLow,tempratureHigh,tempratureLow)
ids = exec pointId from pt
ruleSet = dict(STRING,ANY)
for(i in 0:pt.size()){
tmp = pt[i]
keys = tmp.keys()[1:]
value = tmp.values()[1:]
a = array(STRING)
for(j in 0:keys.size()){
if(keys[j] like "%High"){
s=strpos(keys[j],"High");
a.append!(keys[j][0:s]+ ">" + value[j] )
}else{
s=strpos(keys[j],"Low");
a.append!(keys[j][0:s] + "<" + value[j])
}
}
ruleSet[ids[i]] = parseExpr(a)
}
ruleSet[string(NULL)] = [ <voltage > 100000>]
(4) 定义回调函数
// 定义规则引擎的回调函数
def writeBack(result){
outputTime = now(true)
if(result.rule[0]==true){
s="电压过高"
insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
}
if(result.rule[1]==true){
s="电压过低"
insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
}
if(result.rule[2]==true){
s="电流过高"
insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
}
if(result.rule[3]==true){
s="电流过低"
insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
}
if(result.rule[4]==true){
s="温度过高"
insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
}
if(result.rule[5]==true){
s="温度过低"
insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s)
}
}
(5) 创建规则引擎
// 创建规则引擎
colNames = inputTable.schema().colDefs.name join `inputTime
colTypes = inputTable.schema().colDefs.typeString join `NANOTIMESTAMP
schemaTable = table(1:0,colNames,colTypes)
ruleEngine=createRuleEngine(
name="ruleEngine",
ruleSets=ruleSet,
dummyTable=schemaTable,
outputColumns=["time","pointId","voltage","current","temprature","inputTime"],
outputTable=outputTable,
policy="all",
ruleSetColumn="pointId",
callback=writeBack
)
(6) 订阅流表写入 10 条模拟数据
def handle(msg){
tmp = select *,now(true) as inputTime from msg
getStreamEngine("ruleEngine").append!(tmp)
}
// 订阅数据数据流表
subscribeTable(
tableName="inputTable",
actionName="RuleEngine",
handler=handle,
msgAsTable=true,
offset=0
)
// 生成10条模拟数据
n =10
t=table(n:n,`time`pointId`voltage`current`temprature,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE])
t["time"] = now() +( 1..10) *1000
t["pointId"]=`ID + string(1..n)
t["voltage"]=round(double(rand(45..225,n))+rand(0.5,n) ,2)
t["current"]=round(double(rand(7..43,n))+rand(0.5,n),2)
t["temprature"]=round(double(rand(7..43,n))+rand(0.5,n),2)
// 向流表写入模拟数据
inputTable.append!(t)
查看输入数据表
| time | pointId | voltage | current | temprature |
|---|---|---|---|---|
| 2025.11.09 21:09:24.883 | ID1 | 100.15 | 37.19 | 22.33 |
| 2025.11.09 21:09:25.883 | ID2 | 196.06 | 11.12 | 18.32 |
| 2025.11.09 21:09:26.883 | ID3 | 81.21 | 35.07 | 30.04 |
| 2025.11.09 21:09:27.883 | ID4 | 181.4 | 40.18 | 40.01 |
| 2025.11.09 21:09:28.883 | ID5 | 217.13 | 42.31 | 27.18 |
| 2025.11.09 21:09:29.883 | ID6 | 182.47 | 24.18 | 43.36 |
| 2025.11.09 21:09:30.883 | ID7 | 124.44 | 12.27 | 43.25 |
| 2025.11.09 21:09:31.883 | ID8 | 198.33 | 39.03 | 8.33 |
| 2025.11.09 21:09:32.883 | ID9 | 78.04 | 16.04 | 31.31 |
| 2025.11.09 21:09:33.883 | ID10 | 194.28 | 18.45 | 35.37 |
查看规则引擎的输出数据表
| time | pointId | voltage | current | temprature | inputTime | rule |
|---|---|---|---|---|---|---|
| 2025.11.09 21:09:24.883 | ID1 | 100.15 | 37.19 | 22.33 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
| 2025.11.09 21:09:25.883 | ID2 | 196.06 | 11.12 | 18.32 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
| 2025.11.09 21:09:26.883 | ID3 | 81.21 | 35.07 | 30.04 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
| 2025.11.09 21:09:27.883 | ID4 | 181.4 | 40.18 | 40.01 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
| 2025.11.09 21:09:28.883 | ID5 | 217.13 | 42.31 | 27.18 | 2025.11.09 21:09:23.886 | [false, false, true, false, false, false] |
| 2025.11.09 21:09:29.883 | ID6 | 182.47 | 24.18 | 43.36 | 2025.11.09 21:09:23.886 | [false, false, false, false, true, false] |
| 2025.11.09 21:09:30.883 | ID7 | 124.44 | 12.27 | 43.25 | 2025.11.09 21:09:23.886 | [false, false, false, false, true, false] |
| 2025.11.09 21:09:31.883 | ID8 | 198.33 | 39.03 | 8.33 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, true] |
| 2025.11.09 21:09:32.883 | ID9 | 78.04 | 16.04 | 31.31 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
| 2025.11.09 21:09:33.883 | ID10 | 194.28 | 18.45 | 35.37 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
查看规则引擎回调函数处理后的输出表
| time | pointId | voltage | current | temprature | inputTime | outputTime | comment |
|---|---|---|---|---|---|---|---|
| 2025.11.09 21:09:28.883 | ID5 | 217.13 | 42.31 | 27.18 | 2025.11.09 21:09:23.886 | 2025.11.09 21:09:23.886 | 电流过高 |
| 2025.11.09 21:09:29.883 | ID6 | 182.47 | 24.18 | 43.36 | 2025.11.09 21:09:23.886 | 2025.11.09 21:09:23.887 | 温度过高 |
| 2025.11.09 21:09:30.883 | ID7 | 124.44 | 12.27 | 43.25 | 2025.11.09 21:09:23.886 | 2025.11.09 21:09:23.887 | 温度过高 |
| 2025.11.09 21:09:31.883 | ID8 | 198.33 | 39.03 | 8.33 | 2025.11.09 21:09:23.886 | 2025.11.09 21:09:23.887 | 温度过低 |
结果解释:
设备 'ID5' 的规则集为:
(< voltage > 221.86 >,< voltage < 45.16 >,< current > 39.96 >,< current < 7.47 >,< temprature > 44.43 >,< temprature < 8.03 >)
输入数据命中规则集中的 current > 39.96,经回调函数处理,输出“电流过高”;
设备 'ID6' 的规则集为:
(< voltage > 221.89 >,< voltage < 49.05 >,< current > 43.42 >,< current < 7.06 >,< temprature > 41.98 >,< temprature < 7.418>)
输入数据命中规则集中的 temprature > 41.98 ,经回调函数处理,输出“温度过高”;
设备 'ID7' 的规则集为:
(< voltage > 226.61 >,< voltage < 48.40 >,< current > 40.77 >,< current < 9.19 >,< temprature > 41.95 >,< temprature < 8.14 >)
输入数据命中规则集中的 < temprature > 41.95,经回调函数处理,输出“温度过高”;
设备 'ID8' 的规则集为:
(< voltage > 228.97 >,< voltage < 46.137 >,< current > 42.607 >,< current < 7.29 >,< temprature > 44.40 >,< temprature < 9.27 >)
输入数据命中规则集中的 temprature < 9.27,经回调函数处理,输出“温度过高”;
(7) 动态调整规则示例
//修改规则
newRules = [ < voltage > 223.41 >,< voltage < 49.25 >,< current > 36.84 >,< current < 8.4 >,< temprature > 44.60 >,< temprature < 7.43 >]
updateRule("ruleEngine","ID1",newRules)
//插入一条模拟数据
insert into inputTable values (now(), 'ID1',100.15,37.19,22.33)
current= 37.19,之前的规则未触发报警,现在命中更新后的规则 current > 36.84,输出表新增记录:
| time | pointId | voltage | current | temprature | inputTime | outputTime | comment |
|---|---|---|---|---|---|---|---|
| 2025.11.09 21:43:55.505 | ID1 | 100.15 | 37.19 | 22.33 | 2025.11.09 21:43:55.505 | 2025.11.09 21:43:55.506 | 电流过高 |
