createSparseReactiveStateEngine
Syntax
createSparseReactiveStateEngine(name, metrics, dummyTable, outputTable,
[keyColumn], [extraColumn])
Details
Creates a SparseReactiveStateEngine for sparse state computation on narrow streaming tables based on rules: state computation related to a metric (identified by keyColumn) is triggered only when data for that metric arrives, and the results are written to outputTable in a narrow-table format.
- It is suitable for industrial scenarios where “rules are meaningful only for some devices/sensors”, requiring sparse computation.
- Compared with
createReactiveStateEngine(typically for wide-table/dense computation), this engine avoids unnecessary full metric updates.
Parameters
name is a STRING scalar indicating the engine name.
keyColumn(s),
formula, outputMetricKey.- The first N columns are input metric identifier columns. Their count and
order must match those specified by keyColumn. Each row provides some
values of keyColumn in the input table (e.g., if the input table
identifies metrics by
deviceID, values can be"A001","R131", etc.). - formula is a STRING scalar/vector or metacode representing the
computation expression for the metric. Variable names in the expression
correspond to value columns in the input table (e.g.,
metricValue; also multi-value columns such asValue1,Value2). - outputMetricKey is a STRING scalar/vector indicating the new output
metric name (must be unique), e.g.,
"A001_event_B".
- For the standalone and cluster mode, user-defined functions in the formula
field only support metacode in
<>format when using Orca. - For cluster mode:
- When calling
addSparseReactiveMetricsviarpc, the formula must use string in""format, or log in directly to the node where the engine is located. - If the engine contains code in
<>format, callinggetSparseReactiveMetricsviarpcwill behave abnormally. It is also recommended to log in directly to the node where the engine is located.
- When calling
dummyTable is a table representing a sample table used to initialize the engine. Its schema must be identical to the subscribed streaming table. It can be an empty table or contain a small amount of data.
outputTable is a table handle indicating the output narrow table. The engine
writes computation results to this table. The required column order is: 1) input
metric identifier columns (same count and order as keyColumn) 2) passthrough
columns (same count and order as extraColumn, typically time columns) 3) a
STRING/SYMBOL column outputMetricKey (output metric name) 4) a
DOUBLE column outputValue (output metric value).
keyColumn is a STRING scalar/vector indicating the primary key column names of the input table (used to identify “which metric/device the current row belongs to”). If keyColumn has N columns, metrics must have N corresponding leading identifier columns.
extraColumn (optional) is a STRING scalar/vector indicating the column names in the input table that should be carried to the output table unchanged (e.g., time columns).
Returns
Returns a streaming engine handle (SparseReactiveStateEngine), which can be used to
append data via tableInsert{engine} or
getStreamEngine(name).
Examples
Example 1. Input data contains three device IDs: A001, A002, A003.
When data for A001 arrives, calculate the average of the data within a sliding window of length 3.
When data for A002 arrives, calculate the difference between the maximum and minimum values within a sliding window of length 3, as well as the sum of elements in the window.
No processing is performed when data for A003 arrives.
The time column in the input data is not processed and is retained in the output table.
// Create the input data table (narrow table)
share streamTable(1:0, `timestamp`deviceID`value,
[TIMESTAMP, SYMBOL, DOUBLE]) as inputTable
// Create the output table (narrow table)
share streamTable(1000:0, `deviceID`timestamp`outputMetricKey`outputValue,
[SYMBOL, TIMESTAMP, STRING, DOUBLE]) as outputTable
// Define rules
metrics = table(
["A001", "A002", "A002"] as deviceID,
[
"mavg(value,3) ",
"mmax(value,3)-mmin(value,3)",
"msum(value,3)"
] as formula,
["A001_1", "A002_1", "A002_2"] as outputMetricKey
)
// Create the sparse reactive state engine
stateEngine = createSparseReactiveStateEngine(
name="demoengine",
metrics=metrics,
dummyTable=inputTable,
outputTable=outputTable,
keyColumn="deviceID",
extraColumn="timestamp"
)
// Subscribe to the input stream table
subscribeTable(tableName="inputTable", actionName="demo1", handler=tableInsert{stateEngine})
// Append data
data = table([2026.02.07T20:29:53.927,2026.02.07T20:29:53.928,2026.02.07T20:29:53.929,2026.02.07T20:29:53.930,2026.02.07T20:29:53.931,2026.02.07T20:29:53.932,2026.02.07T20:29:53.933,2026.02.07T20:29:53.934,2026.02.07T20:29:53.935,2026.02.07T20:29:53.936,2026.02.07T20:29:53.937,2026.02.07T20:29:53.938,2026.02.07T20:29:53.939,2026.02.07T20:29:53.940,2026.02.07T20:29:53.941,2026.02.07T20:29:53.942,2026.02.07T20:29:53.943,2026.02.07T20:29:53.944,2026.02.07T20:29:53.945,2026.02.07T20:29:53.946] as time,
["A003","A002","A003","A002","A003","A002","A002","A001","A003","A001","A002","A003","A001","A002","A003","A002","A003","A002","A003","A002"] as deviceID,
[47,87,36,63,28,53,65,48,86,40,18,28,61,77,81,73,66,47,29,3] as value)
inputTable.append!(data)
// view the output
result = select * from outputTable
result
| deviceID | timestamp | outputMetricKey | outputValue |
|---|---|---|---|
| A002 | 2026.02.07 20:29:53.928 | A002_1 | |
| A002 | 2026.02.07 20:29:53.928 | A002_2 | |
| A002 | 2026.02.07 20:29:53.930 | A002_1 | |
| A002 | 2026.02.07 20:29:53.930 | A002_2 | |
| A002 | 2026.02.07 20:29:53.932 | A002_1 | 34 |
| A002 | 2026.02.07 20:29:53.932 | A002_2 | 203 |
| A002 | 2026.02.07 20:29:53.933 | A002_1 | 12 |
| A002 | 2026.02.07 20:29:53.933 | A002_2 | 181 |
| A001 | 2026.02.07 20:29:53.934 | A001_1 | |
| A001 | 2026.02.07 20:29:53.936 | A001_1 | |
| A002 | 2026.02.07 20:29:53.937 | A002_1 | 47 |
| A002 | 2026.02.07 20:29:53.937 | A002_2 | 136 |
| A001 | 2026.02.07 20:29:53.939 | A001_1 | 49.666666666666664 |
| A002 | 2026.02.07 20:29:53.940 | A002_1 | 59 |
| A002 | 2026.02.07 20:29:53.940 | A002_2 | 160 |
| A002 | 2026.02.07 20:29:53.942 | A002_1 | 59 |
| A002 | 2026.02.07 20:29:53.942 | A002_2 | 168 |
| A002 | 2026.02.07 20:29:53.944 | A002_1 | 30 |
| A002 | 2026.02.07 20:29:53.944 | A002_2 | 197 |
| A002 | 2026.02.07 20:29:53.946 | A002_1 | 70 |
| A002 | 2026.02.07 20:29:53.946 | A002_2 | 123 |
Related functions: addSparseReactiveMetrics, getSparseReactiveMetrics, deleteSparseReactiveMetric
