createRuleEngine
语法
createRuleEngine(name, ruleSets, dummyTable, outputColumns, outputTable,
[policy], [ruleSetColumn], [callback], [snapshotDir],
[snapshotIntervalInMsgCount])
详情
createRuleEngine 用于创建一个规则引擎。
该引擎适用于对流数据逐条执行规则判断的场景。可以预先为不同类型的数据定义不同规则,例如按设备编号、证券代码、业务类别等将数据路由到对应规则集,再基于字段值、字段间运算结果或多个条件组合,对每条数据进行实时检查。
引擎主要可以完成以下工作:
-
按字段值匹配不同规则集,对不同类别的数据应用不同检查逻辑。
-
对单条数据执行一条或多条规则判断,例如阈值判断、区间判断,以及
value*price > 10这类基于表达式的计算判断。 -
输出规则检查结果;既可以返回首个未通过规则的下标,也可以返回全部规则的布尔检查结果。
-
在输出结果后通过回调函数继续处理,例如触发告警、筛选异常数据、写入下游表。
因此,可以将规则引擎理解为一个面向实时数据的“条件检查与结果分发”组件:它负责根据数据内容选取规则、完成判断,并将结果输出给下游消费。
应用场景
规则引擎支持毫秒级实时响应、精准处理大规模时序数据以及动态规则调整,可应用于如下场景:
-
物联网:设备监测,例如监测设备的温度、湿度;电力监控,例如监控电压、用电量。
-
金融:风控场景,例如过滤订单、监控股票成交量、设置超量预警信号等。
与异常检测引擎的异同
-
规则引擎是无状态引擎;异常检测引擎是有状态的。
-
规则引擎可以对不同数据,采用不同的检查规则;异常检测引擎会对所有数据以相同的规则进行检测。
-
规则引擎可以动态调整检测规则,包括增加、删除和修改;异常检测引擎创建后无法修改。
-
规则引擎支持设置回调函数,用户可根据检测结果进行相应处理;异常检测引擎不支持设置回调函数。
参数
name 是一个字符串,表示引擎名。
ruleSets 是一个字典,表示规则集。字典的 key 是 STRING 或 INT 类型,value 是一个包含元代码的元组。如果 key 为NULL,表示此条为默认规则。如果未指定 ruleSetColumn,或数据未命中任何规则,将采用默认规则检查数据。一个规则集必须包含默认规则。
dummyTable 一个表对象,和输入的流数据表的 schema 一致,可以含有数据,亦可为空表。
outputColumns 一个 STRING 类型向量,表示输入表中需要保留到输出表的列。
outputTable 是一个表对象,表示输出表,可以是内存表或者分布式表,包含 outputColumns 指示的列,以及一列规则检查的结果列。当参数 policy 设置为 "shortcut" 时,最后一列为 INT 类型的标量;否则,为 BOOL 类型的向量。
-
"shortcut" 是默认值,代表短路逻辑。当检查到任一规则不符合(计算结果为 false)时,规则引擎立即停止检查并返回该规则的 index(从 0 开始);如果所有规则都符合,则返回 NULL。比如数据匹配到如下规则,当 age 字段的值小于 18 时,计算结果为 false,规则引擎停止检查并返回 0 (规则的 index)到输出表中。
(< age > 18 >,< income > 5000 >,< debtRatio < 0.5 >) -
"all" 代表检查全部规则,返回规则集的检查结果,布尔类型。比如数据匹配到如下规则,当
voltage=220、current=37.11、temperature=39,检查结果为[false, false, false]。(< voltage > 221.86 >,< current > 39.96 >,< temperature > 44.43 >)
ruleSetColumn 是一个 STRING 类型标量,为输入表的某一列名,如果没有定义或者输入数据中的该列数据没有命中任何一个规则集,则使用默认规则,即规则集中 key 为 NULL 时所指示的规则。
callback 是一个函数,其参数为一个元组,是引擎输出的一行。若设置此参数,引擎每处理一行,在将引擎处理结果输出到输出表的同时,会将该结果作为入参调用此函数。
snapshotDir 与 snapshotIntervalInMsgCount 可选参数,无需配置,实际不生效,仅用于兼容 ORCA 框架。
返回值
返回一个表对象。
例子
实现步骤
-
创建分布式表,存放回调函数输出的数据。
-
定义规则集,用于检查输入的数据。
-
定义回调函数,根据检查结果进一步处理数据。
-
创建规则引擎。
-
向规则引擎输入数据。
示例代码
创建分布式表
// 创建分布式表,存放回调函数输出的数据
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)
定义规则集
如下规则集包含三组规则,规则的 key 分别为 1、2、NULL,其中 NULL 代表默认规则。
x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)
/* 输出:
->(< (value * price) > 10 >)
2->(< price < 2 >,< price > 6 >)
1->(< value > 1 >)
*/
定义回调函数
如下回调函数用于判断引擎每一行的检查结果中首个值是否为 false,如是,则将对应数据插入分布式表。
def writeBack(result){
if(result.rule[0]==false){
temp = select sym,value,price from result
loadTable("dfs://temp",`pt).append!(temp)
}
}
创建规则引擎
names = `sym`value`price`quantity
types = [INT, DOUBLE, DOUBLE, DOUBLE]
dummy = table(1:0, names, types)
outputNames = `sym`value`price`rule
outputTypes = [INT, DOUBLE, DOUBLE, BOOL[]]
outputTable = table(10:0, outputNames, outputTypes)
test = createRuleEngine(
name="ruleEngineTest",
ruleSets=ruleSets,
dummyTable=dummy,
outputColumns=["sym","value","price"],
outputTable=outputTable,
policy="all",
ruleSetColumn="sym",
callback=writeBack)
向规则引擎输入数据
插入两条 sym=1 的数据,此时会根据规则集 ruleSets 中 key=1
对应的规则,即 value >1 来检查这两条数据。
test.append!(table(1 as sym, 0 as value, 2 as price, 3 as quantity))
test.append!(table(1 as sym, 2 as value, 2 as price, 3 as quantity))
插入三条 sym=2 的数据,此时会根据规则集 ruleSets 中 key=2
对应的规则,即 price < 2 和 price > 6 来检查这三条数据。
test.append!(table(2 as sym, 2 as value, 0 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 4 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 8 as price, 3 as quantity))
插入两条 sym=3 的数据,由于规则集 ruleSets 只设置了键值为 1 和 2 的规则,此时会根据规则集中
key=NULL 对应的规则,即 value*price > 10
来检查这两条数据。
test.append!(table(3 as sym, 2 as value, 3 as price, 3 as quantity))
test.append!(table(3 as sym, 2 as value, 6 as price, 3 as quantity))
查看输出表 outputTable 的内容。
select * from outputTable
| sym | value | price | rule |
|---|---|---|---|
| 1 | 0 | 2 | [false] |
| 1 | 2 | 2 | [true] |
| 2 | 2 | 0 | [true,false] |
| 2 | 2 | 4 | [false,false] |
| 2 | 2 | 8 | [false,true] |
| 3 | 2 | 3 | [false] |
| 3 | 2 | 6 | [true] |
回调函数处理后,所有检查结果中首个值为false的行均被写入了分布式表 dfs://temp/pt。
select * from loadTable("dfs://temp","pt")
| sym | value | price |
|---|---|---|
| 1 | 0 | 2 |
| 2 | 2 | 4 |
| 2 | 2 | 8 |
| 3 | 2 | 3 |
相关函数:updateRule、deleteRule、getRules
