DStream::udfEngine

Syntax

DStream::udfEngine(func)

Details

DStream::udfEngine is an extension engine in Orca stream graphs used to execute user-defined processing logic. When the built-in streaming engine cannot meet specific business requirements, users can define functions via DStream::udfEngine to process each stream record.

To enable stateful computation, UDF functions can access shared variables declared via DStream::sharedTable, DStream::sharedDict, or DStream::sharedKeyedTable. These shared variables can be used across multiple udfEngine instances and tasks within the same stream graph. Orca automatically persists their state using the Checkpoint mechanism and restores them during failure recovery.

Note: UDF functions are not allowed to access any external variables. Only local variables or shared variables referenced via orcaObj("name") are permitted.

Constraints on Shared Variable

  • Shared variables must be declared in advance using DStream::sharedTable, DStream::sharedDict, or DStream::sharedKeyedTable. Within a UDF function, they can be referenced via orcaObj("name"). Note that orcaObj is only valid in the execution context of the UDF.

  • Each shared variable follows a single-writer, multiple-reader pattern: only one udfEngine instance can write to it, while multiple instances may read it concurrently.

  • All tasks that access the same shared variable are scheduled by Orca on the same physical node to ensure local access and state consistency.

Parameters

func A user-defined function. It accepts only one parameter, which is a stream table. This parameter must not be modified in place or treated as a mutable object. Within the function, shared variables defined by DStream::sharedTable, DStream::sharedDict, or DStream::sharedKeyedTable can be read and written for state updates. If the function returns a value, it must be either a dictionary or a table.

Returns

A DStream object.

Examples

In this example, we use DStream::sharedKeyedTable and DStream::udfEngine to implement a historical delta computation.

The DStream::sharedKeyedTable is used to maintain the most recent record for each ID. When a new record arrives, if an entry with the same id already exists in the table, the UDF outputs the difference between the new value and the previously stored (historical) value. If the ID does not exist, the new record is inserted into the table without producing any output.

if(existsCatalog("orcaCatalog")) dropCatalog("orcaCatalog")
createCatalog("orcaCatalog")
go
use catalog orcaCatalog
// Create stream graph
g = createStreamGraph("compare")
g.sharedKeyedTable("history", "id", 1:0, `id`value, [INT, DOUBLE])
g.source("data", `id`value`time, [INT, DOUBLE, TIMESTAMP])
  .udfEngine(def(msg) {
    history = orcaObj("history")
    diffTable = table(100:0, `id`diff, [INT, DOUBLE])
    for(i in 0:msg.size()) {
        idVal = msg.id[i]
        valueVal = msg.value[i]
        // Read historical value
        old = select value from history where id = idVal
        // Write new value
        newRow = table(idVal as id, valueVal as value)
        history.append!(newRow)
        // Compute delta
        if(old.size() > 0) {
            diffTable.append!(table(idVal as id, (valueVal - old.value[0]) as diff))
        }
    }
    return diffTable
  })
  .sink("comparison")
g.submit()
// Generate mock data
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// Insert data
appendOrcaStreamTable("orcaCatalog.orca_table.data", mockData)
//  Generate data with duplicate IDs
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// Insert data
appendOrcaStreamTable("orcaCatalog.orca_table.data", mockData)
// Wait for processing and inspect results
sleep(1000)
select * from orcaCatalog.orca_table.comparison
id diff
1 35.55946895749296
2 -3.4362593906550387
3 36.283468999034596
4 68.97968558337999
5 -91.64246928217878

Related functions: DStream::map, getUdfEngineVariable