DStream::sharedKeyedTable
Syntax
DStream::sharedKeyedTable(name, keyColumns, X, [X1], [X2],
.....)
or
DStream::sharedKeyedTable(name, keyColumns, capacity:size, colNames,
colTypes)
or
DStream::sharedKeyedTable(name, keyColumns, table)
Details
Creates a shared keyed table in Orca, which is only usable within
DStream::udfEngine. For details about keyed tables, refer to
keyedTable.
Parameters
name is a STRING scalar indicating the shared keyed table name.
keyColumn is a string scalar or vector indicating the name(s) of the primary key column(s). The column type must be INTEGRAL, TEMPORAL or LITERAL.
- If the elements of Xk are vectors of equal length, each element of the tuple will be treated as a column in the table.
- If Xk contains elements of different types or unequal lengths, it will be treated as a single column in the table (with the column type set to ANY), and each element will correspond to the value of that column in each row.
- capacity is a positive integer indicating the amount of memory (in terms of the number of rows) allocated to the table. When the number of rows exceeds capacity, the system will first allocate memory of 1.2~2 times of capacity, copy the data to the new memory space, and release the original memory. For large tables, these steps may use significant amount of memory.
- size is an integer no less than 0 indicating the initial size (in
terms of the number of rows) of the table. If size=0, create an empty
table; If size>0, the initialized values are:
- false for Boolean type;
- 0 for numeric, temporal, IPADDR, COMPLEX, and POINT types;
- Null value for Literal, INT128 types.
-
Note:If colTypes is an array vector, size must be 0.
- colNames is a STRING vector of column names.
- colTypes is a string vector of data types. The non-key columns can be specified as an array vector type or ANY type.
For the third scenario, table is a table. Please note that keyColumns in table cannot have duplicate values.
Returns
A keyed table.
Examples
In this example, we use DStream::sharedKeyedTable and
DStream::udfEngine to implement a historical delta computation.
The DStream::sharedKeyedTable is used to maintain the most recent
record for each ID. When a new record arrives, if an entry with the same id already
exists in the table, the UDF outputs the difference between the new value and the
previously stored (historical) value. If the ID does not exist, the new record is
inserted into the table without producing any output.
if(existsCatalog("orcaCatalog")) dropCatalog("orcaCatalog")
createCatalog("orcaCatalog")
go
use catalog orcaCatalog
// Create stream graph
g = createStreamGraph("compare")
g.sharedKeyedTable("history", "id", 1:0, `id`value, [INT, DOUBLE])
g.source("data", `id`value`time, [INT, DOUBLE, TIMESTAMP])
.udfEngine(def(msg) {
history = orcaObj("history")
diffTable = table(100:0, `id`diff, [INT, DOUBLE])
for(i in 0:msg.size()) {
idVal = msg.id[i]
valueVal = msg.value[i]
// Read historical value
old = select value from history where id = idVal
// Write new value
newRow = table(idVal as id, valueVal as value)
history.append!(newRow)
// Compute delta
if(old.size() > 0) {
diffTable.append!(table(idVal as id, (valueVal - old.value[0]) as diff))
}
}
return diffTable
})
.sink("comparison")
g.submit()
// Generate mock data
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// Insert data
appendOrcaStreamTable("orcaCatalog.orca_table.data", mockData)
// Generate data with duplicate IDs
mockData = table(1..5 as id, rand(100.0, 5) as value, now() + 1..5 as time)
// Insert data
appendOrcaStreamTable("orcaCatalog.orca_table.data", mockData)
// Wait for processing and inspect results
sleep(1000)
select * from orcaCatalog.orca_table.comparison
| id | diff |
|---|---|
| 1 | 35.55946895749296 |
| 2 | -3.4362593906550387 |
| 3 | 36.283468999034596 |
| 4 | 68.97968558337999 |
| 5 | -91.64246928217878 |
