Inter-node Subscription Across Clusters
For inter-node subscriptions across clusters, the publisher and subscriber are located in different clusters, requiring to specify a remote connection handle during subscriptions. This section introduces the process of inter-node stream subscription across clusters using trade tick data as an example.
- Create a shared stream table tglobal on the
publisher.
name = `ChannelNo`ApplSeqNum`MDStreamID`BidApplSeqNum`OfferApplSeqNum`SecurityID`SecurityIDSource`TradePrice`TradeQty`ExecType`TradeTime`LocalTime`SeqNo`DataStatus`TradeMoney`TradeBSFlag`BizIndex`OrderKind`Market type = [INT,LONG,SYMBOL,LONG,LONG,SYMBOL,SYMBOL,DOUBLE,INT,SYMBOL,TIMESTAMP,TIME,LONG,INT,DOUBLE,SYMBOL,LONG,SYMBOL,SYMBOL] share streamTable(100:0, name, type) as tglobal
- Create a DFS table tradeTB on the subscriber to store the subscribed
data.
if(existsDatabase("dfs://tradeDB")) dropDatabase("dfs://tradeDB") db1 = database(, VALUE, 2020.01.01..2021.01.01) db2 = database(, HASH, [SYMBOL, 50]) db = database("dfs://tradeDB", COMPO, [db1, db2], , "TSDB") name = `ChannelNo`ApplSeqNum`MDStreamID`BidApplSeqNum`OfferApplSeqNum`SecurityID`SecurityIDSource`TradePrice`TradeQty`ExecType`TradeTime`LocalTime`SeqNo`DataStatus`TradeMoney`TradeBSFlag`BizIndex`OrderKind`Market type = [INT,LONG,SYMBOL,LONG,LONG,SYMBOL,SYMBOL,DOUBLE,INT,SYMBOL,TIMESTAMP,TIME,LONG,INT,DOUBLE,SYMBOL,LONG,SYMBOL,SYMBOL] t = db.createPartitionedTable(table=table(1:0, name, type), tableName="tradeTB", partitionColumns=`TradeTime`SecurityID, sortColumns=[`SecurityID, `TradeTime])
- Subscribe to tglobal on a local server. Set parameter server to the
remote connection handle. Set parameter handler to table trade where
the subscribed data is inserted, and set parameters batchSize and
throttle according to specific needs. For parameter details, refer to
subscribeTable.
trade = loadTable("dfs://tradeDB", "tradeTB") pubNodeHandler=xdb("YOUR_IP",8892) subscribeTable(server=pubNodeHandler, tableName="tglobal", actionName="insertDB", offset=0, handler=trade, msgAsTable=true, batchSize=100000, throttle=60)
- Write simulated data to
tglobal.
for(i in 1..100){ insertData = [rand(100,1),long(i),string(i),long(i),long(i),string(i),string(i),rand(1.0,1),rand(100,1),string(i),timestamp('2021.01.04T09:30:02.000'),time('09:30:02.000'),long(i),rand(100,1),rand(1.0,1),string(i),long(i),string(i),string(i)] insert into tglobal values(insertData) }
- Query the data written to
tradeTB.
select * from loadTable("dfs://tradeDB","tradeTB") limit 5 /* output: ChannelNo ApplSeqNum MDStreamID BidApplSeqNum OfferApplSeqNum SecurityID SecurityIDSource TradePrice TradeQty ExecType TradeTime LocalTime SeqNo DataStatus TradeMoney TradeBSFlag BizIndex OrderKind Market 60 43 43 43 43 43 43 0.7538 56 43 2021.01.04T09:30:02.000 09:30:02.000 43 92 0.6234 43 43 43 43 56 59 59 59 59 59 59 0.1549 48 59 2021.01.04T09:30:02.000 09:30:02.000 59 94 0.428 59 59 59 59 60 92 92 92 92 92 92 0.198 13 92 2021.01.04T09:30:02.000 09:30:02.000 92 87 0.2109 92 92 92 92 57 41 41 41 41 41 41 0.0822 30 41 2021.01.04T09:30:02.000 09:30:02.000 41 32 0.2611 41 41 41 41 7 61 61 61 61 61 61 0.7593 81 61 2021.01.04T09:30:02.000 09:30:02.000 61 15 0.7564 61 61 61 61 */
- Once the stream subscription is complete, unsubscribe and then undefine the stream
table. Note that the stream table can only be undefined after all subscriptions to
this table are
canceled.
// Unsubscribe to the stream table unsubscribeTable(tableName="tglobal", actionName="insertDB") // Undefine the stream table undef(`tglobal, SHARED)