DStream::udfEngine

Syntax

DStream::udfEngine(func, outputColumns, [variableNames], [initValues])

Details

In Orca, this function is used to define a user-defined function that supports side effects and state persistence. Similar to DStream::map, it maps input stream data by applying a specified function. The difference is that this function allows unrestricted reading and modification of external variables within the UDF, and achieves state persistence and fault recovery through a checkpoint mechanism.

Return value: A DStream object.

Arguments

func A user-defined function. Its last parameter must be of type Dictionary, which receives the upstream streaming data. This parameter must not be modified in place or treated as a mutable object. Within the function, external state variables specified by variableNames can be read and written for state updates. If the function returns a value, it must be either a Dictionary or a Table, and must include all fields specified in outputColumns.

outputColumns A STRING vector specifying the column names in the return value of func that should be retained and emitted downstream. If any of these fields are missing in the return value, execution will fail.

variableNames (optional) A STRING vector specifying the external variable names that can be read and updated within func. The engine automatically passes in and updates the values of these variables during execution.

initValues (optional) A vector of initial values corresponding one-to-one with variableNames, specifying the initial state for each external variable.

Examples

def callTimes(mutable cnt, msg) {
    cnt += 1;
    return msg
}
g = createStreamGraph("indicators")
g.source("trade", 1024:0, `price`volume, [DOUBLE,INT])
    .udfEngine(callTimes, [`price,`volume] ,[`number], [5])
    .setEngineName("udf")
    .sink("output111")
g.submit()

go
n = 1000
price = rand(100, n)
volume = rand(1000, n)
t = table(price, volume)
appendOrcaStreamTable("trade", t)

// Obtain the current value of number
useOrcaStreamEngine("udf", getUdfEngineVariable, "number")
// output: 1005

Related functions: DStream::map, getUdfEngineVariable