DStream::udfEngine

语法

DStream::udfEngine(func, outputColumns, [variableNames], [initValues])

详情

在 Orca 流图中,用于创建支持副作用和状态持久化的自定义函数。同 DStream::map 类似,该函数对流中的数据做映射,将指定函数应用到流数据中的每条消息。区别在于该函数允许在自定义函数中自由读取和修改外部变量,并通过 Checkpoint 机制实现状态的持久化与故障恢复。

返回值:DStream 对象。

参数

func 用户定义函数。其最后一个参数必须为字典类型,用于接收上游流数据。该参数不可被原地修改或作为可变对象操作。函数内部可读写由 variableNames 指定的外部变量,实现状态更新。若函数有返回值,类型必须为字典或表,且需包含 outputColumns 指定的所有字段。

outputColumns 字符串向量,指定 func 返回结果中需要保留并输出到下游的字段名。若返回值中缺失这些列,会导致执行失败。

variableNames 可选参数,字符串向量,指定在 func 中可读写的外部变量名。引擎会在执行过程中自动传入并更新这些变量的值。

initValues 可选参数,与 variableNames 一一对应的向量,指定各个外部变量的初始值。

例子

def callTimes(mutable cnt, msg) {
    cnt += 1;
    return msg
}
g = createStreamGraph("indicators")
g.source("trade", 1024:0, `price`volume, [DOUBLE,INT])
    .udfEngine(callTimes, [`price,`volume] ,[`number], [5])
    .setEngineName("udf")
    .sink("output111")
g.submit()
go
n = 1000
price = rand(100, n)
volume = rand(1000, n)
t = table(price, volume)
appendOrcaStreamTable("trade", t)
// 获取当前 number 的值
useOrcaStreamEngine("udf", getUdfEngineVariable, "number")
// output: 1005

相关函数:DStream::map, getUdfEngineVariable