createSparseReactiveStateEngine

语法

createSparseReactiveStateEngine(name, metrics, dummyTable, outputTable, keyColumn, [extraColumn])

详情

创建一个稀疏响应式状态引擎,用于对数据进行稀疏状态计算:仅当某些特定的数据到达时,才触发与之相关的状态计算。

该引擎适用于工业场景下“规则只对部分设备/传感器有意义”的稀疏计算需求。相比 createReactiveStateEngine,可避免无意义的全量指标更新。

参数

name 字符串标量,表示响应式状态引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。

metrics 是一个表,表示稀疏状态计算规则集合。该表至少包含 3 列:
  • 前若干列为特定数据的标识列,与 keyColumn 指定的列名和顺序一致。

  • formula:可以是字符串或元代码,表示该特定数据到达时触发的计算表达式。

  • outputMetricKey:字符串标量,为该计算指定指标名。

注:
  • 单节点模式与集群模式跨节点场景:Orca 中自定义函数的 formula 仅支持 <> 格式的元代码。

  • 集群模式跨节点场景建议直接到对应节点操作:

    • rpc 远程调用 addSparseReactiveMetrics 时,formula 需用 "" 格式字符串,或直接登录引擎所在节点操作;

    • 引擎中含 <> 格式代码时,rpc 调用 getSparseReactiveMetrics 会异常,建议直接登录引擎所在节点操作。

dummyTable 一个表对象,和输入的流数据表的 schema 一致,可以含有数据,亦可为空表。

outputTable 计算结果的输出表,可以是内存表或分布式表,引擎会将计算结果写入该表。包含以下列:

  • 若干特定数据标识列,与 keyColumn 指定的列名和顺序一致。

  • 若干保留的原始列,与 extraColumn 指定的列名和顺序一致。

  • outputMetricKey 列,表示特定数据计算的指标名,其值由 metrics 的 outputMetricKey 列指定。

  • 计算结果列,表示该指标的计算结果。

keyColumn 字符串标量或向量,表示输入表中标识特定数据的主键列名。

extraColumn 可选参数,字符串标量或向量,表示输入表中需要保留到输出表的列名。

返回值

返回一个流数据引擎对象句柄。通过向该句柄写入,将数据注入引擎进行计算。

例子

例1. 输入数据包含三个设备 ID A001,A002,A003。每当 A001 的数据到来,计算长度为 3 的滑动窗口内数据的平均值;每当 A002 的数据到来,计算长度为 3 的滑动窗口内的最大值与最小值的差,以及窗口内元素的和 ;当 A003 的数据到来不做任何处理。输入数据中的时间列不做处理,保留到输出表中。

// 创建输入数据表
share streamTable(1:0, `timestamp`deviceID`value,
    [TIMESTAMP, SYMBOL, DOUBLE]) as inputTable

// 创建输出表
share streamTable(1000:0, `deviceID`timestamp`outputMetricKey`outputValue,
    [SYMBOL, TIMESTAMP, STRING, DOUBLE]) as outputTable

// 定义规则
metrics = table(
    ["A001", "A002", "A002"] as deviceID,
    [
        "mavg(value,3) ",
        "mmax(value,3)-mmin(value,3)",
        "msum(value,3)"
    ] as formula,
    ["A001_1", "A002_1", "A002_2"] as outputMetricKey
)

// 创建稀疏状态引擎
stateEngine = createSparseReactiveStateEngine(
    name="demoengine",
    metrics=metrics,
    dummyTable=inputTable,
    outputTable=outputTable,
    keyColumn="deviceID",
    extraColumn="timestamp"
)
// 订阅输入流表
subscribeTable(tableName="inputTable", actionName="demo1", handler=tableInsert{stateEngine})
// 写入数据
data = table([2026.02.07T20:29:53.927,2026.02.07T20:29:53.928,2026.02.07T20:29:53.929,2026.02.07T20:29:53.930,2026.02.07T20:29:53.931,2026.02.07T20:29:53.932,2026.02.07T20:29:53.933,2026.02.07T20:29:53.934,2026.02.07T20:29:53.935,2026.02.07T20:29:53.936,2026.02.07T20:29:53.937,2026.02.07T20:29:53.938,2026.02.07T20:29:53.939,2026.02.07T20:29:53.940,2026.02.07T20:29:53.941,2026.02.07T20:29:53.942,2026.02.07T20:29:53.943,2026.02.07T20:29:53.944,2026.02.07T20:29:53.945,2026.02.07T20:29:53.946] as time, 
    ["A003","A002","A003","A002","A003","A002","A002","A001","A003","A001","A002","A003","A001","A002","A003","A002","A003","A002","A003","A002"] as deviceID, 
    [47,87,36,63,28,53,65,48,86,40,18,28,61,77,81,73,66,47,29,3] as value)

inputTable.append!(data)
// 查看结果
result = select * from outputTable
result
deviceID timestamp outputMetricKey outputValue
A002 2026.02.07 20:29:53.928 A002_1
A002 2026.02.07 20:29:53.928 A002_2
A002 2026.02.07 20:29:53.930 A002_1
A002 2026.02.07 20:29:53.930 A002_2
A002 2026.02.07 20:29:53.932 A002_1 34
A002 2026.02.07 20:29:53.932 A002_2 203
A002 2026.02.07 20:29:53.933 A002_1 12
A002 2026.02.07 20:29:53.933 A002_2 181
A001 2026.02.07 20:29:53.934 A001_1
A001 2026.02.07 20:29:53.936 A001_1
A002 2026.02.07 20:29:53.937 A002_1 47
A002 2026.02.07 20:29:53.937 A002_2 136
A001 2026.02.07 20:29:53.939 A001_1 49.666666666666664
A002 2026.02.07 20:29:53.940 A002_1 59
A002 2026.02.07 20:29:53.940 A002_2 160
A002 2026.02.07 20:29:53.942 A002_1 59
A002 2026.02.07 20:29:53.942 A002_2 168
A002 2026.02.07 20:29:53.944 A002_1 30
A002 2026.02.07 20:29:53.944 A002_2 197
A002 2026.02.07 20:29:53.946 A002_1 70
A002 2026.02.07 20:29:53.946 A002_2 123

相关函数addSparseReactiveMetrics, getSparseReactiveMetrics, deleteSparseReactiveMetric