StreamGraph::setLocalConfigOnce
Syntax
StreamGraph::setLocalConfigOnce(dict)
Arguments
dict is a dictionary, supporting the following key-value pairs:
Key | Type | Default Value | Description |
---|---|---|---|
subscription.batchSize | INT | 0 | Sets the batchSize parameter for subscriptions between adjacent nodes in the stream graph. |
subscription.throttle | INT | 1 | Sets the throttle parameter for subscriptions between adjacent nodes in the stream graph. |
subscription.timeTrigger | BOOL | false | Sets the timeTrigger parameter for subscriptions between adjacent nodes in the stream graph. |
subscription.sourceOffset | INT | -3 | Sets the offset parameter of subscriptions between nodes
created by StreamGraph::source and their directly
connected downstream nodes in the stream graph. |
Details
Set subscription configurations between adjacent nodes in the stream graph. Once this
function is called, the specified configuration will override any corresponding
global configuration set via StreamGraph::setConfigMap
, or add new
configurations if they were not previously defined.
The configuration takes effect only once between the calling node and its directly connected downstream node; it does not apply if the two nodes are part of a cascade.
Note: Since operations such as sink
and map
do not generate new stream graph nodes, the configuration will propagate and take
effect on the next actual node in the graph.
Examples
The following example sets the batchSize parameter for the subscription for the 1-minute K-line computation node to 100, while all other subscriptions in the stream graph retain the default batchSize value.
,if (!existsCatalog("orca")) {
createCatalog("orca")
}
use catalog orca
g = createStreamGraph("indicators")
sourceStreams = g.source("trade", 1024:0, `symbol`datetime`price`volume, [SYMBOL, TIMESTAMP,DOUBLE, INT])
.fork(2)
stream_1min = sourceStreams[0]
.setLocalConfigOnce({
"subscription.batchSize": 100
})
.timeSeriesEngine(60*1000, 60*1000, <[first(price),max(price),min(price),last(price),sum(volume)]>, "datetime", false, "symbol")
.reactiveStateEngine(<[datetime, first_price, max_price, min_price, last_price, sum_volume, mmax(max_price, 5), mavg(sum_volume, 5)]>, `symbol)
.sink("output_1min")
stream_5min = sourceStreams[1]
.timeSeriesEngine(5*60*1000, 5*60*1000, <[first(price),max(price),min(price),last(price),sum(volume)]>, "datetime", false, "symbol")
.reactiveStateEngine(<[datetime, first_price, max_price, min_price, last_price, sum_volume, mmax(max_price, 5), mavg(sum_volume, 5)]>, `symbol)
.sink("output_5min")