DStream::sharedDict
Syntax
DStream::sharedDict(name, keyObj, valueObj, [ordered=false])
or
DStream::sharedDict(name, keyType, valueType, [ordered=false])
Details
Creates a shared dictionary in Orca, which is only usable within
DStream::udfEngine. For details about dictionaries, refer to
dict.
Parameters
name is a STRING scalar indicating the shared dictionary name.
For the first usage:
keyObj is a vector indicating dictionary keys.
valueObj is a vector indicating dictionary values.
For the second usage:
keyType is the data type of dictionary keys. The following data categories are supported: Integral (excluding COMPRESSED), Temporal, Floating and Literal.
valueType is the data type of dictionary values. Note that COMPLEX/POINT/DECIMAL is not supported.
ordered (optional) is a Boolean value. The default value is false, which indicates to create a regular dictionary. True means to create an ordered dictionary. The regular dictionaries do not track the insertion order of the key-value pairs whereas the ordered dictionaries preserve the insertion order of key-value pairs.
Returns
A dictionary.
Examples
In this example, we use DStream::sharedDict and
DStream::udfEngine to implement a count-based conditional
alerting mechanism. The DStream::sharedDict is used to track the
occurrence count of each key, and when the count for a key exceeds a predefined
threshold, an alert message is sent downstream.
if(existsCatalog("orcaCatalog")) dropCatalog("orcaCatalog")
createCatalog("orcaCatalog")
go
use catalog orcaCatalog
// Create stream graph
g = createStreamGraph("counter")
g.sharedDict("counts", STRING, LONG)
g.source("items", ["key"], [STRING])
.udfEngine(def(msg) {
counts = orcaObj("counts")
triggered = table(100:0, `key`count, [STRING, LONG])
for(i in 0:msg.size()) {
keyVal = msg.key[i]
// Read current count
current = 0
if (keyVal in counts) {
current = counts[keyVal]
}
newCount = current + 1
// Update count
counts[keyVal] = newCount
// Check if threshold is triggered
if(newCount >= 3) {
triggered.append!(table(keyVal as key, newCount as count))
}
}
return triggered
})
.sink("alerts")
g.submit()
go
// Generate mock data
keys = ["A", "B", "A", "C", "B", "A", "B", "B", "C", "C"]
mockData1 = table(take(keys[0:3], 3) as key)
mockData2 = table(take(keys[3:6], 3) as key)
mockData3 = table(take(keys[6:10], 4) as key)
// Insert data
appendOrcaStreamTable("orcaCatalog.orca_table.items", mockData1)
appendOrcaStreamTable("orcaCatalog.orca_table.items", mockData2)
appendOrcaStreamTable("orcaCatalog.orca_table.items", mockData3)
// Wait for processing and inspect results
sleep(1000)
select * from orcaCatalog.orca_table.alerts;
| key | count |
|---|---|
| A | 3 |
| B | 3 |
| B | 4 |
| C | 3 |
