createSparseReactiveStateEngine

Syntax

createSparseReactiveStateEngine(name, metrics, dummyTable, outputTable, [keyColumn], [extraColumn])

Details

Creates a SparseReactiveStateEngine for sparse state computation on narrow streaming tables based on rules: state computation related to a metric (identified by keyColumn) is triggered only when data for that metric arrives, and the results are written to outputTable in a narrow-table format.

  • It is suitable for industrial scenarios where “rules are meaningful only for some devices/sensors”, requiring sparse computation.
  • Compared with createReactiveStateEngine (typically for wide-table/dense computation), this engine avoids unnecessary full metric updates.

Parameters

name is a STRING scalar indicating the engine name.

metrics is a table representing the set of sparse state computation rules. It must contain at least 3 columns: keyColumn(s), formula, outputMetricKey.
  • The first N columns are input metric identifier columns. Their count and order must match those specified by keyColumn. Each row provides some values of keyColumn in the input table (e.g., if the input table identifies metrics by deviceID, values can be "A001", "R131", etc.).
  • formula is a STRING scalar/vector or metacode representing the computation expression for the metric. Variable names in the expression correspond to value columns in the input table (e.g., metricValue; also multi-value columns such as Value1, Value2).
  • outputMetricKey is a STRING scalar/vector indicating the new output metric name (must be unique), e.g., "A001_event_B".
Note:
  • For the standalone and cluster mode, user-defined functions in the formula field only support metacode in <> format when using Orca.
  • For cluster mode:
    • When calling addSparseReactiveMetrics via rpc, the formula must use string in "" format, or log in directly to the node where the engine is located.
    • If the engine contains code in <> format, calling getSparseReactiveMetrics via rpc will behave abnormally. It is also recommended to log in directly to the node where the engine is located.

dummyTable is a table representing a sample table used to initialize the engine. Its schema must be identical to the subscribed streaming table. It can be an empty table or contain a small amount of data.

outputTable is a table handle indicating the output narrow table. The engine writes computation results to this table. The required column order is: 1) input metric identifier columns (same count and order as keyColumn) 2) passthrough columns (same count and order as extraColumn, typically time columns) 3) a STRING/SYMBOL column outputMetricKey (output metric name) 4) a DOUBLE column outputValue (output metric value).

keyColumn is a STRING scalar/vector indicating the primary key column names of the input table (used to identify “which metric/device the current row belongs to”). If keyColumn has N columns, metrics must have N corresponding leading identifier columns.

extraColumn (optional) is a STRING scalar/vector indicating the column names in the input table that should be carried to the output table unchanged (e.g., time columns).

Returns

Returns a streaming engine handle (SparseReactiveStateEngine), which can be used to append data via tableInsert{engine} or getStreamEngine(name).

Examples

Example 1. Input data contains three device IDs: A001, A002, A003.

When data for A001 arrives, calculate the average of the data within a sliding window of length 3.

When data for A002 arrives, calculate the difference between the maximum and minimum values within a sliding window of length 3, as well as the sum of elements in the window.

No processing is performed when data for A003 arrives.

The time column in the input data is not processed and is retained in the output table.

// Create the input data table (narrow table)
share streamTable(1:0, `timestamp`deviceID`value,
    [TIMESTAMP, SYMBOL, DOUBLE]) as inputTable

// Create the output table (narrow table)
share streamTable(1000:0, `deviceID`timestamp`outputMetricKey`outputValue,
    [SYMBOL, TIMESTAMP, STRING, DOUBLE]) as outputTable

// Define rules
metrics = table(
    ["A001", "A002", "A002"] as deviceID,
    [
        "mavg(value,3) ",
        "mmax(value,3)-mmin(value,3)",
        "msum(value,3)"
    ] as formula,
    ["A001_1", "A002_1", "A002_2"] as outputMetricKey
)

// Create the sparse reactive state engine
stateEngine = createSparseReactiveStateEngine(
    name="demoengine",
    metrics=metrics,
    dummyTable=inputTable,
    outputTable=outputTable,
    keyColumn="deviceID",
    extraColumn="timestamp"
)
// Subscribe to the input stream table
subscribeTable(tableName="inputTable", actionName="demo1", handler=tableInsert{stateEngine})
// Append data
data = table([2026.02.07T20:29:53.927,2026.02.07T20:29:53.928,2026.02.07T20:29:53.929,2026.02.07T20:29:53.930,2026.02.07T20:29:53.931,2026.02.07T20:29:53.932,2026.02.07T20:29:53.933,2026.02.07T20:29:53.934,2026.02.07T20:29:53.935,2026.02.07T20:29:53.936,2026.02.07T20:29:53.937,2026.02.07T20:29:53.938,2026.02.07T20:29:53.939,2026.02.07T20:29:53.940,2026.02.07T20:29:53.941,2026.02.07T20:29:53.942,2026.02.07T20:29:53.943,2026.02.07T20:29:53.944,2026.02.07T20:29:53.945,2026.02.07T20:29:53.946] as time, 
    ["A003","A002","A003","A002","A003","A002","A002","A001","A003","A001","A002","A003","A001","A002","A003","A002","A003","A002","A003","A002"] as deviceID, 
    [47,87,36,63,28,53,65,48,86,40,18,28,61,77,81,73,66,47,29,3] as value)

inputTable.append!(data)
// view the output
result = select * from outputTable
result
deviceID timestamp outputMetricKey outputValue
A002 2026.02.07 20:29:53.928 A002_1
A002 2026.02.07 20:29:53.928 A002_2
A002 2026.02.07 20:29:53.930 A002_1
A002 2026.02.07 20:29:53.930 A002_2
A002 2026.02.07 20:29:53.932 A002_1 34
A002 2026.02.07 20:29:53.932 A002_2 203
A002 2026.02.07 20:29:53.933 A002_1 12
A002 2026.02.07 20:29:53.933 A002_2 181
A001 2026.02.07 20:29:53.934 A001_1
A001 2026.02.07 20:29:53.936 A001_1
A002 2026.02.07 20:29:53.937 A002_1 47
A002 2026.02.07 20:29:53.937 A002_2 136
A001 2026.02.07 20:29:53.939 A001_1 49.666666666666664
A002 2026.02.07 20:29:53.940 A002_1 59
A002 2026.02.07 20:29:53.940 A002_2 160
A002 2026.02.07 20:29:53.942 A002_1 59
A002 2026.02.07 20:29:53.942 A002_2 168
A002 2026.02.07 20:29:53.944 A002_1 30
A002 2026.02.07 20:29:53.944 A002_2 197
A002 2026.02.07 20:29:53.946 A002_1 70
A002 2026.02.07 20:29:53.946 A002_2 123

Related functions: addSparseReactiveMetrics, getSparseReactiveMetrics, deleteSparseReactiveMetric