StreamGraph::setLocalConfigOnce

Syntax

StreamGraph::setLocalConfigOnce(dict)

Arguments

dict is a dictionary, supporting the following key-value pairs:

Key Type Default Value Description
subscription.batchSize INT 0 Sets the batchSize parameter for subscriptions between adjacent nodes in the stream graph.
subscription.throttle INT 1 Sets the throttle parameter for subscriptions between adjacent nodes in the stream graph.
subscription.timeTrigger BOOL false Sets the timeTrigger parameter for subscriptions between adjacent nodes in the stream graph.
subscription.sourceOffset INT -3 Sets the offset parameter of subscriptions between nodes created by StreamGraph::source and their directly connected downstream nodes in the stream graph.

Details

Set subscription configurations between adjacent nodes in the stream graph. Once this function is called, the specified configuration will override any corresponding global configuration set via StreamGraph::setConfigMap, or add new configurations if they were not previously defined.

The configuration takes effect only once between the calling node and its directly connected downstream node; it does not apply if the two nodes are part of a cascade.

Note: Since operations such as sink and map do not generate new stream graph nodes, the configuration will propagate and take effect on the next actual node in the graph.

Examples

The following example sets the batchSize parameter for the subscription for the 1-minute K-line computation node to 100, while all other subscriptions in the stream graph retain the default batchSize value.

,if (!existsCatalog("orca")) {
	createCatalog("orca")
}

use catalog orca

g = createStreamGraph("indicators")
sourceStreams = g.source("trade", 1024:0, `symbol`datetime`price`volume, [SYMBOL, TIMESTAMP,DOUBLE, INT])
    .fork(2)
stream_1min = sourceStreams[0]
    .setLocalConfigOnce({
      "subscription.batchSize": 100
    })
    .timeSeriesEngine(60*1000, 60*1000, <[first(price),max(price),min(price),last(price),sum(volume)]>, "datetime", false, "symbol")
    .reactiveStateEngine(<[datetime, first_price, max_price, min_price, last_price, sum_volume, mmax(max_price, 5), mavg(sum_volume, 5)]>, `symbol)
    .sink("output_1min")
stream_5min = sourceStreams[1]
    .timeSeriesEngine(5*60*1000, 5*60*1000, <[first(price),max(price),min(price),last(price),sum(volume)]>, "datetime", false, "symbol")
    .reactiveStateEngine(<[datetime, first_price, max_price, min_price, last_price, sum_volume, mmax(max_price, 5), mavg(sum_volume, 5)]>, `symbol)
    .sink("output_5min")