Subscription Through Python API
This section introduces the process of stream subscription through Python API using trade tick data as an example. For details on Python API, refer to Python API.
- Create a shared stream table tglobal on the publisher in
DolphinDB.
name = `ChannelNo`ApplSeqNum`MDStreamID`BidApplSeqNum`OfferApplSeqNum`SecurityID`SecurityIDSource`TradePrice`TradeQty`ExecType`TradeTime`LocalTime`SeqNo`DataStatus`TradeMoney`TradeBSFlag`BizIndex`OrderKind`Market type = [INT,LONG,SYMBOL,LONG,LONG,SYMBOL,SYMBOL,DOUBLE,INT,SYMBOL,TIMESTAMP,TIME,LONG,INT,DOUBLE,SYMBOL,LONG,SYMBOL,SYMBOL] share streamTable(100:0, name, type) as tglobal
- Create a connection between the subscriber in Python and DolphinDB. Enable a port
for data subscriptions to receive data from the DolphinDB
server.
import numpy as np import pandas as pd import dolphindb as ddb import time s = ddb.session() s.connect(host="localhost", port=8892, userid="admin", password="123456") s.enableStreaming(10020)
- Create a local subscription to tglobal on the Python subscriber. Set
parameters host and port to the IP address and port number of the
publisher node. Set parameter handler to a user-defined callback function to
process the subscribed data, and set parameters batchSize and throttle
according to specific needs. For parameter details, refer to Subscription
Options.
listTrade = [] def handler(lst): listTrade.append(lst) s.subscribe(host="localhost", port=8892, handler=handler, tableName="tglobal", actionName="action", offset=-1, resub=False, filter=None, msgAsTable=False, batchSize=100000, throttle=60)
- Write simulated data to tglobal on the publisher in
DolphinDB.
for(i in 1..100){ insertData = [rand(100,1),long(i),string(i),long(i),long(i),string(i),string(i),rand(1.0,1),rand(100,1),string(i),timestamp('2021.01.04T09:30:02.000'),time('09:30:02.000'),long(i),rand(100,1),rand(1.0,1),string(i),long(i),string(i),string(i)] insert into tglobal values(insertData) }
- Query the data received by the subscriber in
Python.
print(listTrade[0][0]) /* output: [12, 1, '1', 1, 1, '1', '1', 0.053802191046997905, 26, '1', numpy.datetime64('2021-01-04T09:30:02.000'), numpy.datetime64('1970-01-01T09:30:02.000'), 1, 34, 0.4466768535785377, '1', 1, '1', '1'] */
- Once the stream subscription is complete, unsubscribe and then undefine the stream
table. Note that the stream table can only be undefined after all subscriptions to
this table are
canceled.
// Unsubscribe to the stream table unsubscribeTable(tableName="tglobal", actionName="action") // Undefine the stream table undef(`tglobal, SHARED)