Subscription Through Python API

This section introduces the process of stream subscription through Python API using trade tick data as an example. For details on Python API, refer to Python API.

  1. Create a shared stream table tglobal on the publisher in DolphinDB.
    name = `ChannelNo`ApplSeqNum`MDStreamID`BidApplSeqNum`OfferApplSeqNum`SecurityID`SecurityIDSource`TradePrice`TradeQty`ExecType`TradeTime`LocalTime`SeqNo`DataStatus`TradeMoney`TradeBSFlag`BizIndex`OrderKind`Market
    type = [INT,LONG,SYMBOL,LONG,LONG,SYMBOL,SYMBOL,DOUBLE,INT,SYMBOL,TIMESTAMP,TIME,LONG,INT,DOUBLE,SYMBOL,LONG,SYMBOL,SYMBOL]
    share streamTable(100:0, name, type) as tglobal
  2. Create a connection between the subscriber in Python and DolphinDB. Enable a port for data subscriptions to receive data from the DolphinDB server.
    import numpy as np
    import pandas as pd
    import dolphindb as ddb
    import time
    s = ddb.session()
    s.connect(host="localhost", port=8892, userid="admin", password="123456")
    s.enableStreaming(10020)
  3. Create a local subscription to tglobal on the Python subscriber. Set parameters host and port to the IP address and port number of the publisher node. Set parameter handler to a user-defined callback function to process the subscribed data, and set parameters batchSize and throttle according to specific needs. For parameter details, refer to Subscription Options.
    listTrade = []
    def handler(lst):
        listTrade.append(lst)
    s.subscribe(host="localhost", port=8892, handler=handler, tableName="tglobal", actionName="action", offset=-1, resub=False, filter=None, msgAsTable=False, batchSize=100000, throttle=60)
  4. Write simulated data to tglobal on the publisher in DolphinDB.
    for(i in 1..100){
        insertData = [rand(100,1),long(i),string(i),long(i),long(i),string(i),string(i),rand(1.0,1),rand(100,1),string(i),timestamp('2021.01.04T09:30:02.000'),time('09:30:02.000'),long(i),rand(100,1),rand(1.0,1),string(i),long(i),string(i),string(i)]
        insert into tglobal values(insertData)
    }
  5. Query the data received by the subscriber in Python.
    print(listTrade[0][0])
    
    /* output:
    [12, 1, '1', 1, 1, '1', '1', 0.053802191046997905, 26, '1', numpy.datetime64('2021-01-04T09:30:02.000'), numpy.datetime64('1970-01-01T09:30:02.000'), 1, 34, 0.4466768535785377, '1', 1, '1', '1']
    */
  6. Once the stream subscription is complete, unsubscribe and then undefine the stream table. Note that the stream table can only be undefined after all subscriptions to this table are canceled.
    // Unsubscribe to the stream table
    unsubscribeTable(tableName="tglobal", actionName="action")
    // Undefine the stream table
    undef(`tglobal, SHARED)