createRuleEngine

语法

createRuleEngine(name, ruleSets, dummyTable, outputColumns, outputTable, [policy], [ruleSetColumn], [callback], [snapshotDir], [snapshotIntervalInMsgCount])

详情

createRuleEngine 用于创建一个规则引擎。

注:
不支持乱序处理机制,用户需要自行保证输入的数据有序。

该引擎适用于对流数据逐条执行规则判断的场景。可以预先为不同类型的数据定义不同规则,例如按设备编号、证券代码、业务类别等将数据路由到对应规则集,再基于字段值、字段间运算结果或多个条件组合,对每条数据进行实时检查。

引擎主要可以完成以下工作:

  • 按字段值匹配不同规则集,对不同类别的数据应用不同检查逻辑。

  • 对单条数据执行一条或多条规则判断,例如阈值判断、区间判断,以及 value*price > 10 这类基于表达式的计算判断。

  • 输出规则检查结果;既可以返回首个未通过规则的下标,也可以返回全部规则的布尔检查结果。

  • 在输出结果后通过回调函数继续处理,例如触发告警、筛选异常数据、写入下游表。

因此,可以将规则引擎理解为一个面向实时数据的“条件检查与结果分发”组件:它负责根据数据内容选取规则、完成判断,并将结果输出给下游消费。

应用场景

规则引擎支持毫秒级实时响应、精准处理大规模时序数据以及动态规则调整,可应用于如下场景:

  • 物联网:设备监测,例如监测设备的温度、湿度;电力监控,例如监控电压、用电量。

  • 金融:风控场景,例如过滤订单、监控股票成交量、设置超量预警信号等。

异常检测引擎的异同

  • 规则引擎是无状态引擎;异常检测引擎是有状态的。

  • 规则引擎可以对不同数据,采用不同的检查规则;异常检测引擎会对所有数据以相同的规则进行检测。

  • 规则引擎可以动态调整检测规则,包括增加、删除和修改;异常检测引擎创建后无法修改。

  • 规则引擎支持设置回调函数,用户可根据检测结果进行相应处理;异常检测引擎不支持设置回调函数。

参数

name 是一个字符串,表示引擎名。

ruleSets 是一个字典,表示规则集。字典的 key 是 STRING 或 INT 类型,value 是一个包含元代码的元组。如果 key 为NULL,表示此条为默认规则。如果未指定 ruleSetColumn,或数据未命中任何规则,将采用默认规则检查数据。一个规则集必须包含默认规则。

dummyTable 一个表对象,和输入的流数据表的 schema 一致,可以含有数据,亦可为空表。

outputColumns 一个 STRING 类型向量,表示输入表中需要保留到输出表的列。

outputTable 是一个表对象,表示输出表,可以是内存表或者分布式表,包含 outputColumns 指示的列,以及一列规则检查的结果列。当参数 policy 设置为 "shortcut" 时,最后一列为 INT 类型的标量;否则,为 BOOL 类型的向量。

policy 是一个字符串标量,表示规则检查策略。可取以下值:
  • "shortcut" 是默认值,代表短路逻辑。当检查到任一规则不符合(计算结果为 false)时,规则引擎立即停止检查并返回该规则的 index(从 0 开始);如果所有规则都符合,则返回 NULL。比如数据匹配到如下规则,当 age 字段的值小于 18 时,计算结果为 false,规则引擎停止检查并返回 0 (规则的 index)到输出表中。

    (< age > 18 >,< income > 5000 >,< debtRatio < 0.5 >)
  • "all" 代表检查全部规则,返回规则集的检查结果,布尔类型。比如数据匹配到如下规则,当 voltage=220current=37.11temperature=39,检查结果为 [false, false, false]

    (< voltage > 221.86 >,< current > 39.96 >,< temperature > 44.43 >)

ruleSetColumn 是一个 STRING 类型标量,为输入表的某一列名,如果没有定义或者输入数据中的该列数据没有命中任何一个规则集,则使用默认规则,即规则集中 key 为 NULL 时所指示的规则。

callback 是一个函数,其参数为一个元组,是引擎输出的一行。若设置此参数,引擎每处理一行,在将引擎处理结果输出到输出表的同时,会将该结果作为入参调用此函数。

snapshotDirsnapshotIntervalInMsgCount 可选参数,无需配置,实际不生效,仅用于兼容 ORCA 框架。

返回值

返回一个表对象。

例子

实现步骤

  1. 创建分布式表,存放回调函数输出的数据。

  2. 定义规则集,用于检查输入的数据。

  3. 定义回调函数,根据检查结果进一步处理数据。

  4. 创建规则引擎。

  5. 向规则引擎输入数据。

示例代码

创建分布式表

// 创建分布式表,存放回调函数输出的数据
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)

定义规则集

如下规则集包含三组规则,规则的 key 分别为 1、2、NULL,其中 NULL 代表默认规则。

x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)
/* 输出:
->(< (value * price) > 10 >)
2->(< price < 2 >,< price > 6 >)
1->(< value > 1 >)
*/

定义回调函数

如下回调函数用于判断引擎每一行的检查结果中首个值是否为 false,如是,则将对应数据插入分布式表。

def writeBack(result){
    if(result.rule[0]==false){
        temp = select sym,value,price from result
        loadTable("dfs://temp",`pt).append!(temp)
    }
}

创建规则引擎

names = `sym`value`price`quantity
types = [INT, DOUBLE, DOUBLE, DOUBLE]
dummy = table(1:0, names, types)
outputNames = `sym`value`price`rule
outputTypes = [INT, DOUBLE, DOUBLE, BOOL[]]
outputTable = table(10:0, outputNames, outputTypes)
test = createRuleEngine(
        name="ruleEngineTest",
        ruleSets=ruleSets,
        dummyTable=dummy,
        outputColumns=["sym","value","price"],
        outputTable=outputTable,
        policy="all",
        ruleSetColumn="sym",
        callback=writeBack)

向规则引擎输入数据

插入两条 sym=1 的数据,此时会根据规则集 ruleSetskey=1 对应的规则,即 value >1 来检查这两条数据。

test.append!(table(1 as sym, 0 as value, 2 as price, 3 as quantity))
test.append!(table(1 as sym, 2 as value, 2 as price, 3 as quantity))

插入三条 sym=2 的数据,此时会根据规则集 ruleSetskey=2 对应的规则,即 price < 2price > 6 来检查这三条数据。

test.append!(table(2 as sym, 2 as value, 0 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 4 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 8 as price, 3 as quantity))

插入两条 sym=3 的数据,由于规则集 ruleSets 只设置了键值为 1 和 2 的规则,此时会根据规则集中 key=NULL 对应的规则,即 value*price > 10 来检查这两条数据。

test.append!(table(3 as sym, 2 as value, 3 as price, 3 as quantity))
test.append!(table(3 as sym, 2 as value, 6 as price, 3 as quantity))

查看输出表 outputTable 的内容。

select * from outputTable
sym value price rule
1 0 2 [false]
1 2 2 [true]
2 2 0 [true,false]
2 2 4 [false,false]
2 2 8 [false,true]
3 2 3 [false]
3 2 6 [true]

回调函数处理后,所有检查结果中首个值为false的行均被写入了分布式表 dfs://temp/pt。

select * from loadTable("dfs://temp","pt")
sym value price
1 0 2
2 2 4
2 2 8
3 2 3

相关函数:updateRuledeleteRulegetRules