Intra-node Subscription

For intra-node subscriptions, both the publisher and subscriber are on the same node, eliminating the need to specify the server where the publisher is located. This section introduces the process of intra-node stream subscription using trade tick data as an example.

  1. Create a shared stream table tglobal on the publisher.
    name = `ChannelNo`ApplSeqNum`MDStreamID`BidApplSeqNum`OfferApplSeqNum`SecurityID`SecurityIDSource`TradePrice`TradeQty`ExecType`TradeTime`LocalTime`SeqNo`DataStatus`TradeMoney`TradeBSFlag`BizIndex`OrderKind`Market
    type = [INT,LONG,SYMBOL,LONG,LONG,SYMBOL,SYMBOL,DOUBLE,INT,SYMBOL,TIMESTAMP,TIME,LONG,INT,DOUBLE,SYMBOL,LONG,SYMBOL,SYMBOL]
    share streamTable(100:0, name, type) as tglobal
  2. Create a DFS table tradeTB on the subscriber to store the subscribed data.
    if(existsDatabase("dfs://tradeDB"))
        dropDatabase("dfs://tradeDB")
    db1 = database(, VALUE, 2020.01.01..2021.01.01)
    db2 = database(, HASH, [SYMBOL, 50])
    db = database("dfs://tradeDB", COMPO, [db1, db2], , "TSDB")
    name = `ChannelNo`ApplSeqNum`MDStreamID`BidApplSeqNum`OfferApplSeqNum`SecurityID`SecurityIDSource`TradePrice`TradeQty`ExecType`TradeTime`LocalTime`SeqNo`DataStatus`TradeMoney`TradeBSFlag`BizIndex`OrderKind`Market
    type = [INT,LONG,SYMBOL,LONG,LONG,SYMBOL,SYMBOL,DOUBLE,INT,SYMBOL,TIMESTAMP,TIME,LONG,INT,DOUBLE,SYMBOL,LONG,SYMBOL,SYMBOL]
    t = db.createPartitionedTable(table=table(1:0, name, type), tableName="tradeTB", partitionColumns=`TradeTime`SecurityID, sortColumns=[`SecurityID, `TradeTime])
  3. Subscribe to tglobal on a local server. Set parameter handler to table trade where the subscribed data is inserted, and set parameters batchSize and throttle according to specific needs. For parameter details, refer to subscribeTable.
    trade = loadTable("dfs://tradeDB", "tradeTB")
    subscribeTable(tableName="tglobal", actionName="insertDB", offset=-1, handler=trade, msgAsTable=true, batchSize=100000, throttle=60)
  4. Write simulated data to tglobal.
    for(i in 1..100){
        insertData = [rand(100,1),long(i),string(i),long(i),long(i),string(i),string(i),rand(1.0,1),rand(100,1),string(i),timestamp('2021.01.04T09:30:02.000'),time('09:30:02.000'),long(i),rand(100,1),rand(1.0,1),string(i),long(i),string(i),string(i)]
        insert into tglobal values(insertData)
    }
  5. Query the data written to tradeTB.
    select * from loadTable("dfs://tradeDB","tradeTB") limit 5
    
    /* output:
    ChannelNo	ApplSeqNum	MDStreamID	BidApplSeqNum	OfferApplSeqNum	SecurityID	SecurityIDSource	TradePrice	TradeQty	ExecType	TradeTime	LocalTime	SeqNo	DataStatus	TradeMoney	TradeBSFlag	BizIndex	OrderKind	Market
    60	43	43	43	43	43	43	0.7538	56	43	2021.01.04T09:30:02.000	09:30:02.000	43	92	0.6234	43	43	43	43
    56	59	59	59	59	59	59	0.1549	48	59	2021.01.04T09:30:02.000	09:30:02.000	59	94	0.428	59	59	59	59
    60	92	92	92	92	92	92	0.198	13	92	2021.01.04T09:30:02.000	09:30:02.000	92	87	0.2109	92	92	92	92
    57	41	41	41	41	41	41	0.0822	30	41	2021.01.04T09:30:02.000	09:30:02.000	41	32	0.2611	41	41	41	41
    7	61	61	61	61	61	61	0.7593	81	61	2021.01.04T09:30:02.000	09:30:02.000	61	15	0.7564	61	61	61	61
    */
  6. Once the stream subscription is complete, unsubscribe and then undefine the stream table. Note that the stream table can only be undefined after all subscriptions to this table are canceled.
    // Unsubscribe to the stream table
    unsubscribeTable(tableName="tglobal", actionName="insertDB")
    // Undefine the stream table
    undef(`tglobal, SHARED)