DStream::SparseReactiveStateEngine

语法

DStream::sparseReactiveStateEngine(metrics, keyColumn, [extraColumn])

参数

metrics 是一个表,表示稀疏状态计算规则集合。该表至少包含 3 列:
  • 前若干列为特定数据的标识列,与 keyColumn 指定的列名和顺序一致。

  • formula:可以是字符串或元代码,表示该特定数据到达时触发的计算表达式。

  • outputMetricKey:字符串标量,为该计算指定指标名。

注:
  • 单节点模式与集群模式跨节点场景:Orca 中自定义函数的 formula 仅支持 <> 格式的元代码。

  • 集群模式跨节点场景建议直接到对应节点操作:

    • rpc 远程调用 addSparseReactiveMetrics 时,formula 需用 "" 格式字符串,或直接登录引擎所在节点操作;

    • 引擎中含 <> 格式代码时,rpc 调用 getSparseReactiveMetrics 会异常,建议直接登录引擎所在节点操作。

keyColumn 字符串标量或向量,表示输入表中标识特定数据的主键列名。

extraColumn 可选参数,字符串标量或向量,表示输入表中需要保留到输出表的列名。

返回值

返回一个 DStream 对象。

例子

// 若 catalog 不存在则创建
if (!existsCatalog("orca")) {  
    createCatalog("orca")  
}  
go  
use catalog orca  
  
// 如已存在流图,则先销毁该流图
// dropStreamGraph("sparseGraph")
g = createStreamGraph("sparseGraph")

// 定义输入流表结构与输出表结构、稀疏响应式状态引擎

baseStream = g.source("trade", `timestamp`date`deviceId1`deviceId2`deviceId3`value1`value2`value3, [TIMESTAMP, DATE, STRING, STRING, STRING, DOUBLE, DOUBLE, DOUBLE])
formulas = [<cumsumTopN(value1, value2, 5)>, <cumavgTopN(value1, value2, 10)>, <cumstdTopN(value1, value2, 15)>, <cumstdpTopN(value1, value2, 20)>, <cumvarTopN(value1, value2, 5)>, <cumvarpTopN(value1, value2, 10)>, <cumskewTopN(value1, value2, 10)>, <cumkurtosisTopN(value1, value2, 10)>, <cumbetaTopN(value1, value2, value3, 10)>, <cumcorrTopN(value1, value2, value3, 10)>, <cumcovarTopN(value1, value2, value3, 10)>, <cumwsumTopN(value1, value2, value3, 10)>]
keys = "A"+string(1..size(formulas))
keys1 = keys.shuffle()
keys2 = keys.shuffle()
outKeys = "event"+string(1..size(formulas))
metrics = table(
    keys1 as deviceId1,
    keys2 as deviceId2,
    formulas as formula,
    outKeys as outputMetricKey
)
baseStream.sparseReactiveStateEngine(metrics, `deviceId1`deviceId2, `timestamp`date)
.setEngineName("srsEngine")
.sink("output")
g.submit()
go


// 写入数据到 Orca 流表
n = 10000
for(i in 1..5){
    data = table(rand(timestamp(1..1000), n) as timestamp, rand(date(1..1000), n) as date, rand(keys, n) as deviceId1, rand(keys, n) as deviceId2, take(keys, n) as deviceId3, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value1, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value2, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value3)
    appendOrcaStreamTable("trade", data)
}
sleep(3000)
res = select * from orca_table.output

相关函数createSparseReactiveStateEngine