Anomaly Detection Engine

DolphinDB's anomaly detection engine is used to detect anomalies by analyzing metric values. It outputs the records that satisfy the anomaly conditions. It is widely used in real-time monitoring scenarios:

  • IoT Monitoring: Tracks critical device parameters like temperature, humidity, voltage levels, and power consumption.
  • Financial Risk Management: Monitors trading activities through rule-based order filtering, volume tracking, and transaction limit alerts.

Calculation Rules

The anomaly detection engine can be created using the createAnomalyDetectionEngine function. The syntax is as follows:

createAnomalyDetectionEngine(name, metrics, dummyTable, outputTable, timeColumn, [keyColumn], [windowSize], [step], [garbageSize], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup],[anomalyDescription])

For detailed information, refer to createAnomalyDetectionEngine.

Anomaly Metrics

Anomaly metrics are defined by the metrics parameter, which is metacode comprising one or more Boolean expressions. These expressions can be simple functions or more complex formulas that include aggregate functions to handle complex scenarios.

The anomaly detection engine uses different calculation rules for the following 3 types of anomaly metrics:

  • Comparison between a column and a constant or between columns. Only non-aggregate functions can be included. For examples: qty < 4, qty > price, lt(qty, prev(qty)), isNull(qty) == false, etc. For these metrics, the engine conducts calculations for each row and determines whether to output the result.

  • Comparison between aggregate function result and a constant or between aggregate function results. Non-aggregate functions may be used, but their arguments may only include aggregate functions and/or constants, not columns. For examples: avg(qty - price) > 10, percentile(qty, 90) < 100, max(qty) < avg(qty) * 2, le(sum(qty), 5), etc. For these metrics, the engine conducts calculations at frequencies determined by the parameter step and determines whether to output the result.

  • Comparison between aggregate function result and a column, or non-aggregate functions are used and their arguments include aggregate functions and columns. For examples: avg(qty) > qty, le(med(qty), price), etc. For these metrics, the engine conducts calculations at frequencies determined by the parameter step and determines whether to output the result.

Windowing Logic

If an aggregate function is used in metrics, the parameters windowSize and step must be specified. The anomaly metrics are calculated in a window of windowSize at every step.

To facilitate observation and comparison of calculation results, the engine automatically adjusts the starting point of the first window. The alignment size (integer) will be decided by parameter step, roundTime, and the precision of timeColumn. When engine calculates within groups, all groups' windows will be uniformly aligned. The boundaries of each window are the same for each group.

  • If the data type of timeColumn is MONTH, the window begins in January of the year corresponding to the first arriving record.
  • If the data type of timeColumn is DATE, the boudary of the first window will not be adjusted.
  • If the data type of timeColumn is MINUTE(HH:mm), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2 2
    3 3
    4~5 5
    6~10 10
    11~15 15
    16~20 20
    21~30 30
    >30 60 (1 hour)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:

    step alignmentSize
    31~60 60 (1 hour)
    60~120 120 (2 hours)
    121~180 180 (3 hours)
    181~300 300 (5 hours)
    301~600 600 (10 hours)
    601~900 900 (15 hours)
    901~1200 1200 (20 hours)
    1201~1800 1800 (30 hours)
    >1800 3600 (60 hours)
  • If the data type of timeColumn is DATETIME (yyyy-MM-dd HH:mm:ss) or SECOND (HH:mm:ss), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2 2
    3 3
    4~5 5
    6~10 10
    11~15 15
    16~20 20
    21~30 30
    >30 60 (1 minute)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:

    step alignmentSize
    31~60 60 (1 minute)
    61~120 120 (2 minutes)
    121~180 180 (3 minutes)
    181~300 300 (5 minutes)
    301~600 600 (10 minutes)
    601~900 900 (15 minutes)
    901~1200 1200 (20 minutes)
    1201~1800 1800 (30 minutes)
    >1800 3600 (1 hour)
  • If the data type of timeColumn is TIMESTAMP(yyyy-MM-dd HH:mm:ss.mmm) or TIME(HH:mm:ss.mmm), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2 2
    3~5 5
    6~10 10
    11~20 20
    21~25 25
    26~50 50
    51~100 100
    101~200 200
    201~250 250
    251~500 500
    501~1000 1000 (1 second)
    1001~2000 2000 (2 seconds)
    2001~3000 3000 (3 seconds)
    3001~5000 5000 (5 seconds)
    5001~10000 10000 (10 seconds)
    10001~15000 15000 (15 seconds)
    15001~20000 20000 (20 seconds)
    20001~30000 30000 (30 seconds)
    >30000 60000 (1 minute)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30000; The value of alignmentSize is as folllows if step>30000:

    step alignmentSize
    30001~60000 60000 (1 minute)
    60001~120000 120000 (2 minutes)
    120001~300000 300000 (5 minutes)
    300001~600000 600000 (10 minutes)
    600001~900000 900000 (15 minutes)
    900001~1200000 1200000 (20 minutes)
    1200001~1800000 1800000 (30 minutes)
    >1800000 3600000 (1 hour)
  • If the data type of timeColumn is NANOTIMESTAMP(yyyy-MM-dd HH:mm:ss.nnnnnnnnn) or NANOTIME(HH:mm:ss.nnnnnnnnn), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2ns 2ns
    3ns~5ns 5ns
    6ns~10ns 10ns
    11ns~20ns 20ns
    21ns~25ns 25ns
    26ns~50ns 50ns
    51ns~100ns 100ns
    101ns~200ns 200ns
    201ns~250ns 250ns
    251ns~500ns 500ns
    >500ns 1000ns

    if roundTime=true:

    step alignmentSize
    1000ns~1ms 1ms
    1ms~10ms 10ms
    10ms~100ms 100ms
    100ms~1s 1s
    1s~2s 2s
    2s~3s 3s
    3s~5s 5s
    5s~10s 10s
    10s~15s 15s
    15s~20s 20s
    20s~30s 30s
    >30s 1min

If the time of the first record is x with data type of TIMESTAMP, then the starting time of the first window is adjusted to be timeType_cast(x/alignmentSize*alignmentSize+step-windowSize), where "/" produces only the integer part after division. For example, if the time of the first record is 2018.10.08T01:01:01.365, windowSize = 120000, and step = 60000, then alignmentSize = 60000, and the starting time of the first window is timestamp(2018.10.08T01:01:01.365/60000*60000+60000-120000)=2018.10.08T01:01:00.000.

Usage Examples

Example 1: Monitoring Temperature Readings

Simulate a sensor device collecting temperature data. Assume a window of size 4ms, sliding every 2ms, and temperature readings taken every 1ms. The following anomaly metrics are defined:

  • Any single temperature reading above 65
  • Any reading that exceeds 75% of temperatures from the previous window

It works by storing sensor readings in a stream table, where the anomaly detection engine continuously monitors incoming data. When readings match either anomaly condition, they are automatically recorded in a separate output table.

The implementation steps are as follows:

  1. Define a stream table sensor to store the collected data:
    share streamTable(1000:0, `time`temp, [TIMESTAMP, DOUBLE]) as sensor
  2. Define the anomaly detection engine and the output table outputTable, which is also a stream table:
    share streamTable(1000:0, `time`anomalyType`anomalyString, [TIMESTAMP, INT, SYMBOL]) as outputTable
    engine = createAnomalyDetectionEngine(name="engine1", metrics=<[temp > 65, temp > percentile(temp, 75)]>, dummyTable=sensor, outputTable=outputTable, timeColumn=`time, windowSize=6, step=3)
  3. Subscribe to the stream table sensor and write data to the engine:
    subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)
  4. Then inject simulated data to the stream table sensor to observe the processing results:
    timev = 2018.10.08T01:01:01.001 + 1..10
    tempv = 59 66 57 60 63 51 53 52 56 55
    insert into sensor values(timev, tempv)

Check records in the sensor table:

time temp
2018.10.08T01:01:01.002 59
2018.10.08T01:01:01.003 66
2018.10.08T01:01:01.004 57
2018.10.08T01:01:01.005 60
2018.10.08T01:01:01.006 63
2018.10.08T01:01:01.007 51
2018.10.08T01:01:01.008 53
2018.10.08T01:01:01.009 52
2018.10.08T01:01:01.010 56
2018.10.08T01:01:01.011 55

Check the result table outputTable:

time anomalyType anomalyString
2018.10.08T01:01:01.003 0 temp > 65
2018.10.08T01:01:01.003 1 temp > percentile(temp, 75)
2018.10.08T01:01:01.005 1 temp > percentile(temp, 75)
2018.10.08T01:01:01.006 1 temp > percentile(temp, 75)

Below is a detailed explanation of the calculation process for the anomaly detection engine. For readability, the repeated date 2018.10.08T01:01:01 is omitted, and only the millisecond part is shown.

  • For the metric temp > 65: It is a comparison between a column and a constant. The engine conducts calculations for each incoming record. In the simulated data, only the temperature (66) at 003 ms meets this condition.
  • For the metrictemp > percentile(temp, 75): It is a comparison between aggregate result and a column. The temp values appear both as input to the percentile function and as the current value being compared against the calculated threshold. Therefore, when each record arrives, the temp value is compared with the 75th percentile calculated from the previous window's data.
    • The windowing operation begins when the first record arrives at 002 ms, which establishes the first window spanning from 000 (after aligned) to 002 ms. This initial window contains only the single reading at 002 ms, resulting in a 75th percentile threshold of 59. The engine then evaluates the next set of readings from 003 to 005 ms against this threshold of 59. The readings at 003 ms (66) and 005 ms (60) are identified as anomalies as they exceed this threshold.
    • The second window spans from 002 to 005 ms, calculating a new 75th percentile threshold of 60. Comparing the readings from 006 to 008 ms against this new threshold, the engine identifies the reading at 006 ms (63) as an anomaly.
    • For the third window, which spans from 003 to 008 ms, the calculated 75th percentile threshold is 63. The engine evaluates readings from 009 to 011 ms against this threshold, but none of these readings exceed it, resulting in no anomalies.
    • The window calculation process temporarily halts after processing the final reading at 011 ms, waiting for new data to arrive before initiating the next window calculation.

Use the getStreamEngineStat function to monitor the engine status:

getStreamEngineStat().AnomalyDetectionEngine

Output:

name user status lastErrMsg numGroups numRows numMetrics metrics snapshotDir snapshotInterval snapshotMsgId snapshotTimestamp garbageSize memoryUsed
engine1 admin OK 1 10 2 temp > 65, temp > percentile(temp, 75) -1 2,000 8,524

Example 2: Creating an Anomaly Detection Engine with Snapshot Enabled

The snapshot mechanism allows the engine to restore from a failure. This example illustrates the roles of snapshotDir and snapshotIntervalInMsgCount. When subscribing stream table with snapshots enabled, the handler must be specified as the appendMsg function and set handlerNeedMsgId=true to record message positions.

WORK_DIR="/home/root/WORK_DIR"
mkdir(WORK_DIR+"/snapshotDir")
enableTableShareAndPersistence(table = streamTable(10000:0,`time`sym`price`qty, [TIMESTAMP,SYMBOL,DOUBLE,INT]) , tableName="trades", cacheSize=1000000)
enableTableShareAndPersistence(table = streamTable(10000:0, `time`sym`type`metric, [TIMESTAMP,STRING,INT,STRING]), tableName = "output", cacheSize=1000000)
go

adengine = createAnomalyDetectionEngine(name="test", metrics=<[avg(qty)>1]>, dummyTable=trades, outputTable=output, timeColumn=`time, keyColumn=`sym, windowSize=10, step=10, snapshotDir=WORK_DIR+"/snapshotDir", snapshotIntervalInMsgCount=100)
subscribeTable(server="", tableName="trades", actionName="adengine",offset= 0, handler=appendMsg{adengine}, msgAsTable=true, handlerNeedMsgId=true)

def writeData(mutable t){
do{
batch = 10
tmp = table(batch:batch, `time`sym`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])
tmp[`time] = take(now(), batch)
tmp[`sym] = "A"+string(1..batch)
tmp[`price] = round(rand(100.0, batch), 2)
tmp[`qty] = rand(10, batch)
t.append!(tmp)
sleep(1000)
}while(true)
}

job1=submitJob("write", "", writeData, trades)
// restart server

enableTableShareAndPersistence(table = streamTable(10000:0,`time`sym`price`qty, [TIMESTAMP,SYMBOL,DOUBLE,INT]) , tableName="trades", cacheSize=1000000)
enableTableShareAndPersistence(table = streamTable(10000:0, `time`sym`type`metric, [TIMESTAMP,STRING,INT,STRING]), tableName = "output", cacheSize=1000000)

select last(time) from output
>2021.03.16T11:59:10.920

select last(time) from trades
>2021.03.16T11:59:13.916

WORK_DIR="/home/root/WORK_DIR"
adengine = createAnomalyDetectionEngine(name="test", metrics=<[avg(qty)>qty]>, dummyTable=trades, outputTable=output, timeColumn=`time, keyColumn=`sym, windowSize=10, step=10, snapshotDir=WORK_DIR+"/snapshotDir", snapshotIntervalInMsgCount=100)

ofst = getSnapshotMsgId(adengine)
print(ofst)
>299

select count(*) from trades
>390

// subscribe to stream table trades from 5the 300th message
subscribeTable(server="", tableName="trades", actionName="adengine",offset=ofst+1, handler=appendMsg{adengine}, msgAsTable=true, handlerNeedMsgId=true)