Anomaly Detection Engine
DolphinDB's anomaly detection engine is used to detect anomalies by analyzing metric values. It outputs the records that satisfy the anomaly conditions. It is widely used in real-time monitoring scenarios:
- IoT Monitoring: Tracks critical device parameters like temperature, humidity, voltage levels, and power consumption.
- Financial Risk Management: Monitors trading activities through rule-based order filtering, volume tracking, and transaction limit alerts.
Calculation Rules
The anomaly detection engine can be created using the
createAnomalyDetectionEngine
function. The syntax is as
follows:
createAnomalyDetectionEngine(name, metrics, dummyTable, outputTable,
timeColumn, [keyColumn], [windowSize], [step], [garbageSize], [roundTime=true],
[snapshotDir], [snapshotIntervalInMsgCount],
[raftGroup],[anomalyDescription])
For detailed information, refer to createAnomalyDetectionEngine.
Anomaly Metrics
Anomaly metrics are defined by the metrics parameter, which is metacode comprising one or more Boolean expressions. These expressions can be simple functions or more complex formulas that include aggregate functions to handle complex scenarios.
The anomaly detection engine uses different calculation rules for the following 3 types of anomaly metrics:
-
Comparison between a column and a constant or between columns. Only non-aggregate functions can be included. For examples:
qty < 4
,qty > price
,lt(qty, prev(qty))
,isNull(qty) == false
, etc. For these metrics, the engine conducts calculations for each row and determines whether to output the result. -
Comparison between aggregate function result and a constant or between aggregate function results. Non-aggregate functions may be used, but their arguments may only include aggregate functions and/or constants, not columns. For examples:
avg(qty - price) > 10
,percentile(qty, 90) < 100
,max(qty) < avg(qty) * 2
,le(sum(qty), 5)
, etc. For these metrics, the engine conducts calculations at frequencies determined by the parameter step and determines whether to output the result. -
Comparison between aggregate function result and a column, or non-aggregate functions are used and their arguments include aggregate functions and columns. For examples:
avg(qty) > qty
,le(med(qty), price)
, etc. For these metrics, the engine conducts calculations at frequencies determined by the parameter step and determines whether to output the result.
Windowing Logic
If an aggregate function is used in metrics, the parameters windowSize and step must be specified. The anomaly metrics are calculated in a window of windowSize at every step.
To facilitate observation and comparison of calculation results, the engine automatically adjusts the starting point of the first window. The alignment size (integer) will be decided by parameter step, roundTime, and the precision of timeColumn. When engine calculates within groups, all groups' windows will be uniformly aligned. The boundaries of each window are the same for each group.
- If the data type of timeColumn is MONTH, the window begins in January of the year corresponding to the first arriving record.
- If the data type of timeColumn is DATE, the boudary of the first window will not be adjusted.
- If the data type of timeColumn is MINUTE(HH:mm), the value of
alignmentSize is as follows:
if roundTime=false:
step alignmentSize 0~2 2 3 3 4~5 5 6~10 10 11~15 15 16~20 20 21~30 30 >30 60 (1 hour) if roundTime=true:
The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:
step alignmentSize 31~60 60 (1 hour) 60~120 120 (2 hours) 121~180 180 (3 hours) 181~300 300 (5 hours) 301~600 600 (10 hours) 601~900 900 (15 hours) 901~1200 1200 (20 hours) 1201~1800 1800 (30 hours) >1800 3600 (60 hours) - If the data type of timeColumn is DATETIME (yyyy-MM-dd HH:mm:ss) or
SECOND (HH:mm:ss), the value of alignmentSize is as follows:
if roundTime=false:
step alignmentSize 0~2 2 3 3 4~5 5 6~10 10 11~15 15 16~20 20 21~30 30 >30 60 (1 minute) if roundTime=true:
The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:
step alignmentSize 31~60 60 (1 minute) 61~120 120 (2 minutes) 121~180 180 (3 minutes) 181~300 300 (5 minutes) 301~600 600 (10 minutes) 601~900 900 (15 minutes) 901~1200 1200 (20 minutes) 1201~1800 1800 (30 minutes) >1800 3600 (1 hour) - If the data type of timeColumn is TIMESTAMP(yyyy-MM-dd HH:mm:ss.mmm)
or TIME(HH:mm:ss.mmm), the value of alignmentSize is as
follows:
if roundTime=false:
step alignmentSize 0~2 2 3~5 5 6~10 10 11~20 20 21~25 25 26~50 50 51~100 100 101~200 200 201~250 250 251~500 500 501~1000 1000 (1 second) 1001~2000 2000 (2 seconds) 2001~3000 3000 (3 seconds) 3001~5000 5000 (5 seconds) 5001~10000 10000 (10 seconds) 10001~15000 15000 (15 seconds) 15001~20000 20000 (20 seconds) 20001~30000 30000 (30 seconds) >30000 60000 (1 minute) if roundTime=true:
The value of alignmentSize is same as above table if step<=30000; The value of alignmentSize is as folllows if step>30000:
step alignmentSize 30001~60000 60000 (1 minute) 60001~120000 120000 (2 minutes) 120001~300000 300000 (5 minutes) 300001~600000 600000 (10 minutes) 600001~900000 900000 (15 minutes) 900001~1200000 1200000 (20 minutes) 1200001~1800000 1800000 (30 minutes) >1800000 3600000 (1 hour) - If the data type of timeColumn is NANOTIMESTAMP(yyyy-MM-dd
HH:mm:ss.nnnnnnnnn) or NANOTIME(HH:mm:ss.nnnnnnnnn), the value of
alignmentSize is as follows:
if roundTime=false:
step alignmentSize 0~2ns 2ns 3ns~5ns 5ns 6ns~10ns 10ns 11ns~20ns 20ns 21ns~25ns 25ns 26ns~50ns 50ns 51ns~100ns 100ns 101ns~200ns 200ns 201ns~250ns 250ns 251ns~500ns 500ns >500ns 1000ns if roundTime=true:
step alignmentSize 1000ns~1ms 1ms 1ms~10ms 10ms 10ms~100ms 100ms 100ms~1s 1s 1s~2s 2s 2s~3s 3s 3s~5s 5s 5s~10s 10s 10s~15s 15s 15s~20s 20s 20s~30s 30s >30s 1min
If the time of the first record is x with data type of TIMESTAMP, then the
starting time of the first window is adjusted to be
timeType_cast(x/alignmentSize*alignmentSize+step-windowSize)
,
where "/" produces only the integer part after division. For example, if the
time of the first record is 2018.10.08T01:01:01.365, windowSize = 120000,
and step = 60000, then alignmentSize = 60000, and the starting
time of the first window is
timestamp(2018.10.08T01:01:01.365/60000*60000+60000-120000)=2018.10.08T01:01:00.000.
Usage Examples
Example 1: Monitoring Temperature Readings
Simulate a sensor device collecting temperature data. Assume a window of size 4ms, sliding every 2ms, and temperature readings taken every 1ms. The following anomaly metrics are defined:
- Any single temperature reading above 65
- Any reading that exceeds 75% of temperatures from the previous window
It works by storing sensor readings in a stream table, where the anomaly detection engine continuously monitors incoming data. When readings match either anomaly condition, they are automatically recorded in a separate output table.
The implementation steps are as follows:
- Define a stream table sensor to store the collected
data:
share streamTable(1000:0, `time`temp, [TIMESTAMP, DOUBLE]) as sensor
- Define the anomaly detection engine and the output table outputTable, which
is also a stream
table:
share streamTable(1000:0, `time`anomalyType`anomalyString, [TIMESTAMP, INT, SYMBOL]) as outputTable engine = createAnomalyDetectionEngine(name="engine1", metrics=<[temp > 65, temp > percentile(temp, 75)]>, dummyTable=sensor, outputTable=outputTable, timeColumn=`time, windowSize=6, step=3)
- Subscribe to the stream table sensor and write data to the
engine:
subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)
- Then inject simulated data to the stream table sensor to observe the
processing
results:
timev = 2018.10.08T01:01:01.001 + 1..10 tempv = 59 66 57 60 63 51 53 52 56 55 insert into sensor values(timev, tempv)
Check records in the sensor table:
time | temp |
---|---|
2018.10.08T01:01:01.002 | 59 |
2018.10.08T01:01:01.003 | 66 |
2018.10.08T01:01:01.004 | 57 |
2018.10.08T01:01:01.005 | 60 |
2018.10.08T01:01:01.006 | 63 |
2018.10.08T01:01:01.007 | 51 |
2018.10.08T01:01:01.008 | 53 |
2018.10.08T01:01:01.009 | 52 |
2018.10.08T01:01:01.010 | 56 |
2018.10.08T01:01:01.011 | 55 |
Check the result table outputTable:
time | anomalyType | anomalyString |
---|---|---|
2018.10.08T01:01:01.003 | 0 | temp > 65 |
2018.10.08T01:01:01.003 | 1 | temp > percentile(temp, 75) |
2018.10.08T01:01:01.005 | 1 | temp > percentile(temp, 75) |
2018.10.08T01:01:01.006 | 1 | temp > percentile(temp, 75) |
Below is a detailed explanation of the calculation process for the anomaly detection engine. For readability, the repeated date 2018.10.08T01:01:01 is omitted, and only the millisecond part is shown.
- For the metric
temp > 65
: It is a comparison between a column and a constant. The engine conducts calculations for each incoming record. In the simulated data, only the temperature (66) at 003 ms meets this condition. - For the metric
temp > percentile(temp, 75)
: It is a comparison between aggregate result and a column. The temp values appear both as input to thepercentile
function and as the current value being compared against the calculated threshold. Therefore, when each record arrives, the temp value is compared with the 75th percentile calculated from the previous window's data.- The windowing operation begins when the first record arrives at 002 ms, which establishes the first window spanning from 000 (after aligned) to 002 ms. This initial window contains only the single reading at 002 ms, resulting in a 75th percentile threshold of 59. The engine then evaluates the next set of readings from 003 to 005 ms against this threshold of 59. The readings at 003 ms (66) and 005 ms (60) are identified as anomalies as they exceed this threshold.
- The second window spans from 002 to 005 ms, calculating a new 75th percentile threshold of 60. Comparing the readings from 006 to 008 ms against this new threshold, the engine identifies the reading at 006 ms (63) as an anomaly.
- For the third window, which spans from 003 to 008 ms, the calculated 75th percentile threshold is 63. The engine evaluates readings from 009 to 011 ms against this threshold, but none of these readings exceed it, resulting in no anomalies.
- The window calculation process temporarily halts after processing the final reading at 011 ms, waiting for new data to arrive before initiating the next window calculation.
Use the getStreamEngineStat
function to monitor the engine
status:
getStreamEngineStat().AnomalyDetectionEngine
Output:
name | user | status | lastErrMsg | numGroups | numRows | numMetrics | metrics | snapshotDir | snapshotInterval | snapshotMsgId | snapshotTimestamp | garbageSize | memoryUsed |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
engine1 | admin | OK | 1 | 10 | 2 | temp > 65, temp > percentile(temp, 75) | -1 | 2,000 | 8,524 |
Example 2: Creating an Anomaly Detection Engine with Snapshot Enabled
The snapshot mechanism allows the engine to restore from a failure. This example
illustrates the roles of snapshotDir and
snapshotIntervalInMsgCount. When subscribing stream table with
snapshots enabled, the handler must be specified as the
appendMsg
function and set handlerNeedMsgId=true to
record message positions.
WORK_DIR="/home/root/WORK_DIR"
mkdir(WORK_DIR+"/snapshotDir")
enableTableShareAndPersistence(table = streamTable(10000:0,`time`sym`price`qty, [TIMESTAMP,SYMBOL,DOUBLE,INT]) , tableName="trades", cacheSize=1000000)
enableTableShareAndPersistence(table = streamTable(10000:0, `time`sym`type`metric, [TIMESTAMP,STRING,INT,STRING]), tableName = "output", cacheSize=1000000)
go
adengine = createAnomalyDetectionEngine(name="test", metrics=<[avg(qty)>1]>, dummyTable=trades, outputTable=output, timeColumn=`time, keyColumn=`sym, windowSize=10, step=10, snapshotDir=WORK_DIR+"/snapshotDir", snapshotIntervalInMsgCount=100)
subscribeTable(server="", tableName="trades", actionName="adengine",offset= 0, handler=appendMsg{adengine}, msgAsTable=true, handlerNeedMsgId=true)
def writeData(mutable t){
do{
batch = 10
tmp = table(batch:batch, `time`sym`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])
tmp[`time] = take(now(), batch)
tmp[`sym] = "A"+string(1..batch)
tmp[`price] = round(rand(100.0, batch), 2)
tmp[`qty] = rand(10, batch)
t.append!(tmp)
sleep(1000)
}while(true)
}
job1=submitJob("write", "", writeData, trades)
// restart server
enableTableShareAndPersistence(table = streamTable(10000:0,`time`sym`price`qty, [TIMESTAMP,SYMBOL,DOUBLE,INT]) , tableName="trades", cacheSize=1000000)
enableTableShareAndPersistence(table = streamTable(10000:0, `time`sym`type`metric, [TIMESTAMP,STRING,INT,STRING]), tableName = "output", cacheSize=1000000)
select last(time) from output
>2021.03.16T11:59:10.920
select last(time) from trades
>2021.03.16T11:59:13.916
WORK_DIR="/home/root/WORK_DIR"
adengine = createAnomalyDetectionEngine(name="test", metrics=<[avg(qty)>qty]>, dummyTable=trades, outputTable=output, timeColumn=`time, keyColumn=`sym, windowSize=10, step=10, snapshotDir=WORK_DIR+"/snapshotDir", snapshotIntervalInMsgCount=100)
ofst = getSnapshotMsgId(adengine)
print(ofst)
>299
select count(*) from trades
>390
// subscribe to stream table trades from 5the 300th message
subscribeTable(server="", tableName="trades", actionName="adengine",offset=ofst+1, handler=appendMsg{adengine}, msgAsTable=true, handlerNeedMsgId=true)