getOrcaDataLineage

Syntax

getOrcaDataLineage(name)

Arguments

name is a string indicating the name of the table or timer engine to be queried. The table name supports two formats:

  • Fully qualified name, e.g. "catalog_name.orca_table.table_name"
  • Internal name format, e.g. "public_stream_table_05056f34_92b0_16b2_204e_c3c63e3f8a84"

Details

When name is a table name, this function retrieves the data lineage of the specified public stream table within the stream graph, including both the current lineage and any historical lineage that previously existed.

It returns a JSON object that contains information about the upstream dependencies of the table in each stream graph where it exists.

Each stream graph is represented as a key–value pair, where the key is the internal name, and the value contains the stream graph’s attributes and upstream nodes:

  • fnq: A STRING scalar. Represents the fully qualified name of the stream graph.
  • isDeleted: A Boolean value. Indicates whether the stream graph has been deleted.
  • Each node is represented as a key–value pair, where the key is the node name, and the value is an object with the following properties:
    • isRoot: A Boolean value. Indicates whether the node is a root node.
    • parent: A STRING vector. Contains the names of all direct parent nodes. For root nodes, this is an empty array [].
    • isTable: A Boolean value. Indicates whether the node is a table.
    • isEngine: A Boolean value. Indicates whether the node is an engine.

When name is a TimerEngine name, return a JSON object with the following fields:

  • input: Information about the engine func’s input parameters. Keys are parameter names and values are data types. If there are no inputs, it is {}.
  • output: Information about the engine func’s return value.
    • If the return value is a table, the keys are column names and the values are column types.
    • If the return value is any other kind of scalar, the key is "return" and the value is the data type.
    • In other cases, the key is "return" and the value describes the data form.

Examples

// Create and submit the stream graph
createCatalog("test")
go
use catalog test

def myFunc(x,y,z){
    return table(x as col1,y as col2,z as col3)
}
a = ["aaa"]
b = ["bbb"]
c = ["ccc"]
t = table(1..100 as id, 1..100 as value, take(09:29:00.000..13:00:00.000, 100) as timestamp)

g1 = createStreamGraph("factor1")
g1.source("snapshot1", schema(t).colDefs.name, schema(t).colDefs.typeString)
  .reactiveStateEngine([<cumsum(value)>, <timestamp>])
  .setEngineName("rse1")
  .timerEngine(3, myFunc, a, b, c)
  .setEngineName("myJob")
  .buffer("end")
g1.submit()

g2 = createStreamGraph("factor2")
g2.source("snapshot2", schema(t).colDefs.name, schema(t).colDefs.typeString)
  .reactiveStateEngine([<cumsum(value)>, <timestamp>])
  .setEngineName("rse2")
  .buffer("end")
g2.submit()

// drop stream graph "factor2"
dropStreamGraph("factor2")

// retrieves the data lineage
getOrcaDataLineage("test.orca_table.end")
/*
{
  "40d12999-f8f3-9cb5-4a49-813bdee46a9f": {
    "fqn": "test.orca_graph.factor1",
    "isDeleted": false,
    "test.orca_engine.rse1": {
      "isRoot": false,
      "isTable": false,
      "isEngine": true,
      "parents": [
        "test.orca_table.snapshot1"
      ]
    },
    "test.orca_table.end": {
      "isRoot": false,
      "isTable": true,
      "isEngine": false,
      "parents": [
        "test.orca_engine.rse1"
      ]
    },
    "test.orca_table.snapshot1": {
      "isRoot": true,
      "isTable": true,
      "isEngine": false,
      "parents": []
    }
  },
  "4cee4ed3-007b-f38a-8743-f6776a6172d9": {
    "fqn": "test.orca_graph.factor2",
    "isDeleted": true,
    "test.orca_engine.rse2": {
      "isRoot": false,
      "isTable": false,
      "isEngine": true,
      "parents": [
        "test.orca_table.snapshot2"
      ]
    },
    "test.orca_table.end": {
      "isRoot": false,
      "isTable": true,
      "isEngine": false,
      "parents": [
        "test.orca_engine.rse2"
      ]
    },
    "test.orca_table.snapshot2": {
      "isRoot": true,
      "isTable": true,
      "isEngine": false,
      "parents": []
    }
  }
}
*/

// check timerEngine func
getOrcaDataLineage("test.orca_engine.myJob")
/*
{
  "input": {
    "x": "STRING",
    "y": "STRING",
    "z": "STRING"
  },
  "output": {
    "col1": "STRING",
    "col2": "STRING",
    "col3": "STRING"
  }
}
*/