DStream::SparseReactiveStateEngine
Syntax
DStream::sparseReactiveStateEngine(metrics, keyColumn,
[extraColumn])
Details
Creates a sparse reactive state engine for sparse state computation. For details, see createSparseReactiveStateEngine.
Parameters
keyColumn(s),
formula, outputMetricKey.- The first N columns are input metric identifier columns. Their count and
order must match those specified by keyColumn. Each row provides some
values of keyColumn in the input table (e.g., if the input table
identifies metrics by
deviceID, values can be"A001","R131", etc.). - formula is a STRING scalar/vector or metacode representing the
computation expression for the metric. Variable names in the expression
correspond to value columns in the input table (e.g.,
metricValue; also multi-value columns such asValue1,Value2). - outputMetricKey is a STRING scalar/vector indicating the new output
metric name (must be unique), e.g.,
"A001_event_B".
- For the standalone and cluster mode, user-defined functions in the formula
field only support metacode in
<>format when using Orca. - For cluster mode:
- When calling
addSparseReactiveMetricsviarpc, the formula must use string in""format, or log in directly to the node where the engine is located. - If the engine contains code in
<>format, callinggetSparseReactiveMetricsviarpcwill behave abnormally. It is also recommended to log in directly to the node where the engine is located.
- When calling
keyColumn is a STRING scalar/vector indicating the primary key column names of the input table (used to identify “which metric/device the current row belongs to”). If keyColumn has N columns, metrics must have N corresponding leading identifier columns.
extraColumn (optional) is a STRING scalar/vector indicating the column names in the input table that should be carried to the output table unchanged (e.g., time columns).
Returns
A DStream object.
Examples
// Create a catalog if it does not exist
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// If a stream graph with the same name already exists, destroy it first
// dropStreamGraph("sparseGraph")
g = createStreamGraph("sparseGraph")
// Define the schema of inputTable and outputTable, as well as the SparseReactiveStateEngine
baseStream = g.source("trade", `timestamp`date`deviceId1`deviceId2`deviceId3`value1`value2`value3, [TIMESTAMP, DATE, STRING, STRING, STRING, DOUBLE, DOUBLE, DOUBLE])
formulas = [<cumsumTopN(value1, value2, 5)>, <cumavgTopN(value1, value2, 10)>, <cumstdTopN(value1, value2, 15)>, <cumstdpTopN(value1, value2, 20)>, <cumvarTopN(value1, value2, 5)>, <cumvarpTopN(value1, value2, 10)>, <cumskewTopN(value1, value2, 10)>, <cumkurtosisTopN(value1, value2, 10)>, <cumbetaTopN(value1, value2, value3, 10)>, <cumcorrTopN(value1, value2, value3, 10)>, <cumcovarTopN(value1, value2, value3, 10)>, <cumwsumTopN(value1, value2, value3, 10)>]
keys = "A"+string(1..size(formulas))
keys1 = keys.shuffle()
keys2 = keys.shuffle()
outKeys = "event"+string(1..size(formulas))
metrics = table(
keys1 as deviceId1,
keys2 as deviceId2,
formulas as formula,
outKeys as outputMetricKey
)
baseStream.sparseReactiveStateEngine(metrics, `deviceId1`deviceId2, `timestamp`date)
.setEngineName("srsEngine")
.sink("output")
g.submit()
go
// Append data and view the ouput
n = 10000
for(i in 1..5){
data = table(rand(timestamp(1..1000), n) as timestamp, rand(date(1..1000), n) as date, rand(keys, n) as deviceId1, rand(keys, n) as deviceId2, take(keys, n) as deviceId3, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value1, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value2, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value3)
appendOrcaStreamTable("trade", data)
}
sleep(3000)
res = select * from orca_table.output
Related function: createSparseReactiveStateEngine
