createRuleEngine
Syntax
createRuleEngine(name, ruleSets, dummyTable, outputColumns, outputTable,
[policy], [ruleSetColumn], [callback])
Details
Create a rule engine that supports multiple rule sets. When a record is ingested into the engine, the engine applies a specific rule based on its ruleSetColumnvalue. The rule sets can be dynamically added, modified, or deleted. This engine can be used to validate incoming data from multiple aspects for risk control.
Different from the anomaly detection engine
(createAnomalyDetectionEngine
):
-
The rule engine is stateless while the anomaly detection engine is stateful.
-
The rule engine can apply different rules for different messages, while the anomaly detection engine applies a set of rule to all messages.
-
Only the rule engine supports dynamic addition, modification, or deletion of the rule sets.
-
Only the rule engine supports specifying a callback function for further processing on the check results.
Arguments
name is a string indicating the name of the engine.
ruleSets is a dictionary specifying the rule sets. Its key is of STRING or INT type, and value is a tuple with metacode. It must contain a default rule set with a key of null values.
dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.
outputColumns is a STRING vector indicating the input columns to be preserved in the output table.
outputTable is a table to which the engine inserts calculation results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.
The output columns are in the following order:
(1) The first few are columns specified in outputColumns.
(2) Then followed by a result column after applying the rule sets. When policy is set to "shortcut", this column is of INT type; otherwise it's of BOOL[] type.
policy (optional) is a STRING scalar indicating the checking policy of rule sets. It can take the following values:
-
shortcut (default): When any check result is false, the corresponding index (which starts from 0) of the rule set is returned. Otherwise NULL is returned.
-
all: Check all specified rules and return an array vector of BOOLEAN type, its elements are the checking results for each rule set.
ruleSetColumn (optional) is a STRING scalar indicating an input column name. If it is not set, or the specified column does not match any rule set, then the default rule set is applied.
callback (optional) is a function which takes a table as input. The table contains a record output by the engine. If specified, the callback function is invoked with each output passed in as an argument. If not specified, the engine will only insert the checking results into the output table.
Examples
// define rule sets
x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)
// create a DFS table to write results to the callback function
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)
// create callback function
def writeBack(result){
if(result.rule[0]==false){
temp = select sym,value,price from result
loadTable("dfs://temp",`pt).append!(temp)
}
}
// create a rule engine
names = `sym`value`price`quantity
types = [INT, DOUBLE, DOUBLE, DOUBLE]
dummy = table(1:0, names, types)
outputNames = `sym`value`price`rule
outputTypes = [INT, DOUBLE, DOUBLE, BOOL[]]
share table(10:0, outputNames, outputTypes) as outputTable
test = createRuleEngine(name="ruleEngineTest", ruleSets=ruleSets, dummyTable=dummy, outputColumns=["sym","value","price"], outputTable=outputTable, policy="all", ruleSetColumn="sym", callback=writeBack)
// When sym=1, check whether value > 1
test.append!(table(1 as sym, 0 as value, 2 as price, 3 as quantity))
test.append!(table(1 as sym, 2 as value, 2 as price, 3 as quantity))
// When sym=2, check whether value price < 2 and price > 6
test.append!(table(2 as sym, 2 as value, 0 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 4 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 8 as price, 3 as quantity))
// When sym is not 1 or 2, check whether value*price > 10
test.append!(table(3 as sym, 2 as value, 3 as price, 3 as quantity))
test.append!(table(3 as sym, 2 as value, 6 as price, 3 as quantity))
outputTable:
sym | value | price | rule |
---|---|---|---|
1 | 0 | 2 | [false] |
1 | 2 | 2 | [true] |
2 | 2 | 0 | [true,false] |
2 | 2 | 4 | [false,false] |
2 | 2 | 8 | [false,true] |
3 | 2 | 3 | [false] |
3 | 2 | 6 | [true] |
Check table "dfs://temp/pt" for the callback result:
select * from loadTable("dfs://temp","pt")
sym | value | price |
---|---|---|
1 | 0 | 2 |
2 | 2 | 4 |
2 | 2 | 8 |
3 | 2 | 3 |
Related functions: updateRule, deleteRule