Dstream::buffer
语法
DStream::buffer(name, [asyncWrite=true], [compress=true], [cacheSize],
[retentionMinutes=1440], [flushMode=0], [preCache], [cachePurgeTimeColumn],
[cachePurgeInterval], [cacheRetentionTime])
详情
创建一个持久化共享流数据表,用于存储流计算中间结果。参考 enableTableShareAndPersistence。
参数
name 表示流表的名称。字符串标量,可以传入完整的流表全限定名(如 trading.orca_table.trades);也可以仅提供流表名(如 trades),系统会根据当前的 catalog 设置自动补全为对应的全限定名。
asyncWrite 可选参数,是一个布尔值,表示是否异步持久化数据到磁盘。默认值为 true,流数据写入内存即为写入成功,持久化到磁盘的操作将会由另一个线程执行。
注:
持久化数据到磁盘包含两个步骤:
-
写内存数据到操作系统缓存
-
写缓存数据到磁盘(是否开启同步刷盘由参数 flushMode 决定)
返回值
返回一个 DStream 对象。
例子
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
g = createStreamGraph("indicators")
g.source("trade", 1:0, `time`symbol`price`volume, [DATETIME,SYMBOL,DOUBLE,LONG])
.timeSeriesEngine(windowSize=60, step=60, metrics=[<first(price) as open>, <max(price) as high>, <min(price) as low>, <last(price) as close>, <sum(volume) as volume>], timeColumn=`time, keyColumn=`symbol)
.buffer("bufferTable")
