DStream::SparseReactiveStateEngine

Syntax

DStream::sparseReactiveStateEngine(metrics, keyColumn, [extraColumn])

Parameters

metrics is a table representing the set of sparse state computation rules. It must contain at least 3 columns: keyColumn(s), formula, outputMetricKey.
  • The first N columns are input metric identifier columns. Their count and order must match those specified by keyColumn. Each row provides some values of keyColumn in the input table (e.g., if the input table identifies metrics by deviceID, values can be "A001", "R131", etc.).
  • formula is a STRING scalar/vector or metacode representing the computation expression for the metric. Variable names in the expression correspond to value columns in the input table (e.g., metricValue; also multi-value columns such as Value1, Value2).
  • outputMetricKey is a STRING scalar/vector indicating the new output metric name (must be unique), e.g., "A001_event_B".
Note:
  • For the standalone and cluster mode, user-defined functions in the formula field only support metacode in <> format when using Orca.
  • For cluster mode:
    • When calling addSparseReactiveMetrics via rpc, the formula must use string in "" format, or log in directly to the node where the engine is located.
    • If the engine contains code in <> format, calling getSparseReactiveMetrics via rpc will behave abnormally. It is also recommended to log in directly to the node where the engine is located.

keyColumn is a STRING scalar/vector indicating the primary key column names of the input table (used to identify “which metric/device the current row belongs to”). If keyColumn has N columns, metrics must have N corresponding leading identifier columns.

extraColumn (optional) is a STRING scalar/vector indicating the column names in the input table that should be carried to the output table unchanged (e.g., time columns).

Returns

A DStream object.

Examples

// Create a catalog if it does not exist
if (!existsCatalog("orca")) {  
    createCatalog("orca")  
}  
go  
use catalog orca  
  
// If a stream graph with the same name already exists, destroy it first
// dropStreamGraph("sparseGraph")
g = createStreamGraph("sparseGraph")

// Define the schema of inputTable and outputTable, as well as the SparseReactiveStateEngine

baseStream = g.source("trade", `timestamp`date`deviceId1`deviceId2`deviceId3`value1`value2`value3, [TIMESTAMP, DATE, STRING, STRING, STRING, DOUBLE, DOUBLE, DOUBLE])
formulas = [<cumsumTopN(value1, value2, 5)>, <cumavgTopN(value1, value2, 10)>, <cumstdTopN(value1, value2, 15)>, <cumstdpTopN(value1, value2, 20)>, <cumvarTopN(value1, value2, 5)>, <cumvarpTopN(value1, value2, 10)>, <cumskewTopN(value1, value2, 10)>, <cumkurtosisTopN(value1, value2, 10)>, <cumbetaTopN(value1, value2, value3, 10)>, <cumcorrTopN(value1, value2, value3, 10)>, <cumcovarTopN(value1, value2, value3, 10)>, <cumwsumTopN(value1, value2, value3, 10)>]
keys = "A"+string(1..size(formulas))
keys1 = keys.shuffle()
keys2 = keys.shuffle()
outKeys = "event"+string(1..size(formulas))
metrics = table(
    keys1 as deviceId1,
    keys2 as deviceId2,
    formulas as formula,
    outKeys as outputMetricKey
)
baseStream.sparseReactiveStateEngine(metrics, `deviceId1`deviceId2, `timestamp`date)
.setEngineName("srsEngine")
.sink("output")
g.submit()
go


// Append data and view the ouput
n = 10000
for(i in 1..5){
    data = table(rand(timestamp(1..1000), n) as timestamp, rand(date(1..1000), n) as date, rand(keys, n) as deviceId1, rand(keys, n) as deviceId2, take(keys, n) as deviceId3, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value1, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value2, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value3)
    appendOrcaStreamTable("trade", data)
}
sleep(3000)
res = select * from orca_table.output

Related function: createSparseReactiveStateEngine