Inter-node Subscription Across Clusters

For inter-node subscriptions across clusters, the publisher and subscriber are located in different clusters, requiring to specify a remote connection handle during subscriptions. This section introduces the process of inter-node stream subscription across clusters using trade tick data as an example.

  1. Create a shared stream table tglobal on the publisher.
    name = `ChannelNo`ApplSeqNum`MDStreamID`BidApplSeqNum`OfferApplSeqNum`SecurityID`SecurityIDSource`TradePrice`TradeQty`ExecType`TradeTime`LocalTime`SeqNo`DataStatus`TradeMoney`TradeBSFlag`BizIndex`OrderKind`Market
    type = [INT,LONG,SYMBOL,LONG,LONG,SYMBOL,SYMBOL,DOUBLE,INT,SYMBOL,TIMESTAMP,TIME,LONG,INT,DOUBLE,SYMBOL,LONG,SYMBOL,SYMBOL]
    share streamTable(100:0, name, type) as tglobal
  2. Create a DFS table tradeTB on the subscriber to store the subscribed data.
    if(existsDatabase("dfs://tradeDB"))
        dropDatabase("dfs://tradeDB")
    db1 = database(, VALUE, 2020.01.01..2021.01.01)
    db2 = database(, HASH, [SYMBOL, 50])
    db = database("dfs://tradeDB", COMPO, [db1, db2], , "TSDB")
    name = `ChannelNo`ApplSeqNum`MDStreamID`BidApplSeqNum`OfferApplSeqNum`SecurityID`SecurityIDSource`TradePrice`TradeQty`ExecType`TradeTime`LocalTime`SeqNo`DataStatus`TradeMoney`TradeBSFlag`BizIndex`OrderKind`Market
    type = [INT,LONG,SYMBOL,LONG,LONG,SYMBOL,SYMBOL,DOUBLE,INT,SYMBOL,TIMESTAMP,TIME,LONG,INT,DOUBLE,SYMBOL,LONG,SYMBOL,SYMBOL]
    t = db.createPartitionedTable(table=table(1:0, name, type), tableName="tradeTB", partitionColumns=`TradeTime`SecurityID, sortColumns=[`SecurityID, `TradeTime])
  3. Subscribe to tglobal on a local server. Set parameter server to the remote connection handle. Set parameter handler to table trade where the subscribed data is inserted, and set parameters batchSize and throttle according to specific needs. For parameter details, refer to subscribeTable.
    trade = loadTable("dfs://tradeDB", "tradeTB")
    pubNodeHandler=xdb("YOUR_IP",8892)
    subscribeTable(server=pubNodeHandler, tableName="tglobal", actionName="insertDB", offset=0, handler=trade, msgAsTable=true, batchSize=100000, throttle=60)
  4. Write simulated data to tglobal.
    for(i in 1..100){
        insertData = [rand(100,1),long(i),string(i),long(i),long(i),string(i),string(i),rand(1.0,1),rand(100,1),string(i),timestamp('2021.01.04T09:30:02.000'),time('09:30:02.000'),long(i),rand(100,1),rand(1.0,1),string(i),long(i),string(i),string(i)]
        insert into tglobal values(insertData)
    }
  5. Query the data written to tradeTB.
    select * from loadTable("dfs://tradeDB","tradeTB") limit 5
    
    /* output:
    ChannelNo	ApplSeqNum	MDStreamID	BidApplSeqNum	OfferApplSeqNum	SecurityID	SecurityIDSource	TradePrice	TradeQty	ExecType	TradeTime	LocalTime	SeqNo	DataStatus	TradeMoney	TradeBSFlag	BizIndex	OrderKind	Market
    60	43	43	43	43	43	43	0.7538	56	43	2021.01.04T09:30:02.000	09:30:02.000	43	92	0.6234	43	43	43	43
    56	59	59	59	59	59	59	0.1549	48	59	2021.01.04T09:30:02.000	09:30:02.000	59	94	0.428	59	59	59	59
    60	92	92	92	92	92	92	0.198	13	92	2021.01.04T09:30:02.000	09:30:02.000	92	87	0.2109	92	92	92	92
    57	41	41	41	41	41	41	0.0822	30	41	2021.01.04T09:30:02.000	09:30:02.000	41	32	0.2611	41	41	41	41
    7	61	61	61	61	61	61	0.7593	81	61	2021.01.04T09:30:02.000	09:30:02.000	61	15	0.7564	61	61	61	61
    */
  6. Once the stream subscription is complete, unsubscribe and then undefine the stream table. Note that the stream table can only be undefined after all subscriptions to this table are canceled.
    // Unsubscribe to the stream table
    unsubscribeTable(tableName="tglobal", actionName="insertDB")
    // Undefine the stream table
    undef(`tglobal, SHARED)