DStream::udfEngine
语法
DStream::udfEngine(func)
详情
在 Orca 流图中,用于创建支持状态持久化的自定义函数。该函数对流中的数据做映射,将指定函数应用到流数据中的每条消息,允许在自定义函数中读取和修改共享变量,并通过 Checkpoint 机制实现共享变量的持久化与故障恢复。
参数
func
用户定义函数。其最后一个参数必须为字典类型,用于接收上游流数据。该参数不可被原地修改或作为可变对象操作。函数内部可读写由DStream::sharedTable、DStream::sharedDict
或 DStream::sharedKeyedTable 定义的共享变量,实现状态更新。若函数有返回值,类型必须为字典或表。
在 func 中访问共享变量时,需通过 orcaObj(name)获取。注意,
orcaObj(name) 仅在 UDF Engine 的执行上下文中可用,用于说明 name
是一个共享变量。
返回值
返回一个 DStream 对象。
例子
下面通过 DStream::sharedKeyedTable 与 DStream::udfEngine
的结合,实现计算历史差值的功能。
在本示例中,使用 DStream::sharedKeyedTable 存储已处理数据的历史记录,对每个 id
仅保留最新的一条记录。当新数据到达时,若该 id 已存在于表中,则输出新值与历史值之间的差值;否则,仅将新数据存入表中,不产生输出。
// 设置catalog
if(existsCatalog("testCatalog1")) dropCatalog("testCatalog1")
createCatalog("testCatalog1")
go
use catalog testCatalog1
// 创建流图
g = createStreamGraph("compare")
g.sharedKeyedTable("history", "id", 1:0, `id`value, [INT, DOUBLE])
g.source("data", `id`value`time, [INT, DOUBLE, TIMESTAMP])
.udfEngine(def(msg) {
history = orcaObj("history")
diffTable = table(100:0, `id`diff, [INT, DOUBLE])
for(i in 0:msg.size()) {
idVal = msg.id[i]
valueVal = msg.value[i]
// 读取历史值
old = select value from history where id = idVal
// 写入新值
newRow = table(idVal as id, valueVal as value)
history.append!(newRow)
// 计算差值
if(old.size() > 0) {
diffTable.append!(table(idVal as id, (valueVal - old.value[0]) as diff))
}
}
return diffTable
})
.sink("comparison")
g.submit()
// 生成模拟数据
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// 插入数据
appendOrcaStreamTable("testCatalog1.orca_table.data", mockData)
// 生成id重复的数据
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// 插入数据
appendOrcaStreamTable("testCatalog1.orca_table.data", mockData)
// 等待处理并查看结果
sleep(1000)
select * from testCatalog1.orca_table.comparison
