createStreamBroadcastEngine

Syntax

createStreamBroadcastEngine(name, dummyTable, outputTables)

Details

createStreamBroadcastEngine creates a stream broadcast engine that distributes the same data stream to different target tables. This function returns a table object, and by ingesting data to the table, multi-channel broadcasting of the streaming data is achieved.

Use this engine when you need to process a single stream of data in multiple ways. For example, save one copy to disk while sending another copy to a computing engine for further processing.

Arguments

name is a string indicating the name of the engine. It is the only identifier of an engine on a data or compute node. It can have letter, number and "_" and must start with a letter.

dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.

outputTables is a tuple of two or more table objects (which can be in-memory tables, DFS tables, or streaming engines). The schema of each table object must be the same as dummyTable.

Examples

share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStream
share streamTable(1000:0, `sym`factor1, [STRING,DOUBLE]) as resultStream

t=table(100:0, `sym`price, [STRING,DOUBLE])

//define the output tables: a reactive state engine and a DFS table for storing data
rse = createReactiveStateEngine(name="reactiveDemo", metrics =<cumavg(price)>, dummyTable=tickStream, outputTable=resultStream, keyColumn="sym")
if(existsDatabase("dfs://database1")){
	dropDatabase("dfs://database1")
}
db=database("dfs://database1", VALUE, "A"+string(1..10))
pt=db.createPartitionedTable(t,`pt,`sym)

//create the stream broadcast engine
broadcastEngine=createStreamBroadcastEngine(name="broadcastEngine", dummyTable=tickStream, outputTables=[loadTable("dfs://database1", `pt),getStreamEngine("reactiveDemo")])

//subscribe to the tickStream stream table
subscribeTable(tableName=`tickStream, actionName="sub", handler=tableInsert{broadcastEngine}, msgAsTable = true)

//ingest the subscribed data into the engine
n=100000
symbols=take(("A" + string(1..10)),n)
prices=100+rand(1.0,n)
t1=table(symbols as sym, prices as price)
tickStream.append!(t1)

//check the number of records in the DFS table
select count(*) from loadTable("dfs://database1", `pt)
// output: 100,000

//check the status of the reactive state streaming engine
getStreamEngineStat().ReactiveStreamEngine

name

user

status

lastErrMsg

numGroups

numRows

numMetrics

memoryInUsed

snapshotDir

snapshotInterval

snapshotMsgId

snapshotTimestamp

reactiveDemo admin OK 10 100,000 1 2,600 -1