DStream::sharedDict
语法
DStream::sharedDict(name, keyObj, valueObj, [ordered=false])
或
DStream::sharedDict(name, keyType, valueType, [ordered=false])
详情
在 Orca 中创建一个共享字典。关于字典的说明参见 dict。
参数
name 字符串标量,表示共享字典的名称。
对于第一种情形,keyObj 是表示键的向量,valueObj 是表示值的向量。
对于第二种情形,keyType 是字典键的数据类型,valueType 是字典值的数据类型。系统支持以下键的数据类型:Literal、Integral(COMPRESS 除外)、Floating 和 Temporal。字典中的值不支持 COMPLEX,POINT 类别。
ordered 一个布尔值,默认为 false,表示创建一个无序字典。当 ordered = true 时,创建一个有序字典。无序字典在输出或进行遍历时,其键值对不保留输入时的顺序;有序字典在输出或进行遍历时,键值对的顺序与输入顺序保持一致。
返回值
一个字典。
例子
本例结合 DStream::sharedDict 与
DStream::udfEngine,实现计数并触发条件告警的功能。具体实现为,使用
DStream::sharedDict 统计每个键(key)的出现次数。当某个键的累计计数超过预设阈值时,UDF
函数将向下游输出一条告警消息。
// 设置catalog
if(existsCatalog("testCatalog1")) dropCatalog("testCatalog1")
createCatalog("testCatalog1")
go
use catalog testCatalog1
// 创建流图
g = createStreamGraph("compare")
g.sharedKeyedTable("history", "id", 1:0, `id`value, [INT, DOUBLE])
g.source("data", `id`value`time, [INT, DOUBLE, TIMESTAMP])
.udfEngine(def(msg) {
history = orcaObj("history")
diffTable = table(100:0, `id`diff, [INT, DOUBLE])
for(i in 0:msg.size()) {
idVal = msg.id[i]
valueVal = msg.value[i]
// 读取历史值
old = select value from history where id = idVal
// 写入新值
newRow = table(idVal as id, valueVal as value)
history.append!(newRow)
// 计算差值
if(old.size() > 0) {
diffTable.append!(table(idVal as id, (valueVal - old.value[0]) as diff))
}
}
return diffTable
})
.sink("comparison")
g.submit()
// 生成模拟数据
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// 插入数据
appendOrcaStreamTable("testCatalog1.orca_table.data", mockData)
// 生成id重复的数据
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// 插入数据
appendOrcaStreamTable("testCatalog1.orca_table.data", mockData)
// 等待处理并查看结果
sleep(1000)
select * from testCatalog1.orca_table.comparison
