Inter-node Subscription Within a Cluster
For inter-node subscriptions within a cluster, the publisher and subscriber are on different nodes within the same cluster. The publisher node must be specified during subscriptions. This section introduces the process of inter-node stream subscription within a cluster using trade tick data as an example.
- Create a shared stream table tglobal on the
publisher.
name = `ChannelNo`ApplSeqNum`MDStreamID`BidApplSeqNum`OfferApplSeqNum`SecurityID`SecurityIDSource`TradePrice`TradeQty`ExecType`TradeTime`LocalTime`SeqNo`DataStatus`TradeMoney`TradeBSFlag`BizIndex`OrderKind`Market type = [INT,LONG,SYMBOL,LONG,LONG,SYMBOL,SYMBOL,DOUBLE,INT,SYMBOL,TIMESTAMP,TIME,LONG,INT,DOUBLE,SYMBOL,LONG,SYMBOL,SYMBOL] share streamTable(100:0, name, type) as tglobal
- Create a DFS table tradeTB on the subscriber to store the subscribed
data.
if(existsDatabase("dfs://tradeDB")) dropDatabase("dfs://tradeDB") db1 = database(, VALUE, 2020.01.01..2021.01.01) db2 = database(, HASH, [SYMBOL, 50]) db = database("dfs://tradeDB", COMPO, [db1, db2], , "TSDB") name = `ChannelNo`ApplSeqNum`MDStreamID`BidApplSeqNum`OfferApplSeqNum`SecurityID`SecurityIDSource`TradePrice`TradeQty`ExecType`TradeTime`LocalTime`SeqNo`DataStatus`TradeMoney`TradeBSFlag`BizIndex`OrderKind`Market type = [INT,LONG,SYMBOL,LONG,LONG,SYMBOL,SYMBOL,DOUBLE,INT,SYMBOL,TIMESTAMP,TIME,LONG,INT,DOUBLE,SYMBOL,LONG,SYMBOL,SYMBOL] t = db.createPartitionedTable(table=table(1:0, name, type), tableName="tradeTB", partitionColumns=`TradeTime`SecurityID, sortColumns=[`SecurityID, `TradeTime])
- Subscribe to tglobal on a local server. Set parameter server to the
publisher's node name (e.g., dnode1). Set parameter handler to table
trade where the subscribed data is inserted, and set parameters
batchSize and throttle according to specific needs. For parameter
details, refer to subscribeTable.
trade = loadTable("dfs://tradeDB", "tradeTB") subscribeTable(server="dnode1", tableName="tglobal", actionName="insertDB", offset=0, handler=trade, msgAsTable=true, batchSize=100000, throttle=60)
- Write simulated data to
tglobal.
for(i in 1..100){ insertData = [rand(100,1),long(i),string(i),long(i),long(i),string(i),string(i),rand(1.0,1),rand(100,1),string(i),timestamp('2021.01.04T09:30:02.000'),time('09:30:02.000'),long(i),rand(100,1),rand(1.0,1),string(i),long(i),string(i),string(i)] insert into tglobal values(insertData) }
- Query the data written to
tradeTB.
select * from loadTable("dfs://tradeDB","tradeTB") limit 5 /* output: ChannelNo ApplSeqNum MDStreamID BidApplSeqNum OfferApplSeqNum SecurityID SecurityIDSource TradePrice TradeQty ExecType TradeTime LocalTime SeqNo DataStatus TradeMoney TradeBSFlag BizIndex OrderKind Market 60 43 43 43 43 43 43 0.7538 56 43 2021.01.04T09:30:02.000 09:30:02.000 43 92 0.6234 43 43 43 43 56 59 59 59 59 59 59 0.1549 48 59 2021.01.04T09:30:02.000 09:30:02.000 59 94 0.428 59 59 59 59 60 92 92 92 92 92 92 0.198 13 92 2021.01.04T09:30:02.000 09:30:02.000 92 87 0.2109 92 92 92 92 57 41 41 41 41 41 41 0.0822 30 41 2021.01.04T09:30:02.000 09:30:02.000 41 32 0.2611 41 41 41 41 7 61 61 61 61 61 61 0.7593 81 61 2021.01.04T09:30:02.000 09:30:02.000 61 15 0.7564 61 61 61 61 */
- Once the stream subscription is complete, unsubscribe and then undefine the stream
table. Note that the stream table can only be undefined after all subscriptions to
this table are
canceled.
// Unsubscribe to the stream table unsubscribeTable(tableName="tglobal", actionName="insertDB") // Undefine the stream table undef(`tglobal, SHARED)