PollingClient
PollingClient
returns a message queue upon subscription, from which
users can retrieve and process the messages.
This section presents a comprehensive overview of the PollingClient
,
covering its constructor, subscription and unsubscription mechanisms, data retrieval
methods, and provides a practical usage example.
Constructing a PollingClient
The constructor declaration is as follows:
PollingClient(int listeningPort)
- listeningPort: The subscription port on the client to subscribe to the data sent from the server.
DolphinDB server version 2.00.9 and later enable the publisher to push data through the existing connection requested by the subscriber. Therefore, the subscriber does not need to specify a specific port (filled in 0). If listeningPort is specified, it will be ignored by the API.
Creating Subscription
PollingClient
offers the subscribe
function to
establish subscriptions to DolphinDB stream tables. The declaration for the
subscribe
function is as follows:
MessageQueueSP subscribe(
string host,
int port,
string tableName,
string actionName,
int64_t offset = -1,
bool resub = true,
const VectorSP &filter = nullptr,
bool msgAsTable = false,
bool allowExists = false,
string userName="",
string password="",
const StreamDeserializerSP &blobDeserializer = nullptr,
const std::vector<std::string>& backupSites = std::vector<std::string>(),
int resubscribeInterval = 100,
bool subOnce = false,
int resubscribeTimeout = 0
)
Arguments
- host: The IP address of the publisher node.
- port: The port number of the publisher node.
- tableName: The name of the subscribed stream table.
- actionName: The name of the subscription task. It can contain letters, digits, and underlines.
- offset: The position of the first message where the subscription begins. A message is a row of the stream table. Offset is relative to the first row of the subscribed stream table when it was created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts. Note that if offset is unspecified, negative numbers, or exceeds the row count, the subscription starts with the next new message.
- resub: Whether to re-subscribe after a network disconnection.
- filter: The filtering conditions. Only the messages with the specified filtering column values will be published.
- msgAsTable: true means the subscribed data is ingested into handler as a Table. false means the subscribed data is ingested into handler as an AnyVector.
- allowExists: true means multiple handlers can be specified. false means only a single handler can be specified.
- userName:The username used to connect to the server.
- password: The password used to connect to the server.
- blobDeserializer: The deserializer for the subscribed heterogeneous stream table.
- backupSites: A backup node list with each specified in
host:port format, e.g., ["192.168.0.1:8848",
"192.168.0.2:8849"].
- If specified, failover mechanism is automatically activated. If the subscribed node (i.e., primary node) becomes unavailable due to a disconnection or failover, the API attempts reconnection repeatedly across backup nodes until the subscription is successful.
- The stream table definition, including the name, schema, and data entries, must be identical across the primary node and all configured backup nodes to prevent potential data inconsistencies during failover.
- If the subscribed table is a high-availability stream table, the connection will be established based on the node set of backupSites.
- To cancel a subscription, specify the host and port of the primary node.
- resubscribeInterval: A non-negative integer indicating the wait time (in milliseconds) between resubscription attempts when a disconnection is detected. The default value is 100.
- subOnce: Whether to include the subscribed node in subsequent reconnection attempts following the node switch. Note that this parameter does not take effect for a single subscribed node when resub = true.
- resubscribeTimeout: An integer specifying the timeout (in milliseconds) allowed for resubscription attempts. The default value is 0, which means no time limit. Recommended to use with resub=true to prevent excessive resource consumption from prolonged resubscription attempts.
Return Values
A pointer to the message queue.
Retrieving Data
MessageQueue
class implements a queue data structure to store
Message
class instances. A MessageQueue
object
is returned when a PollingClient
object creates a subscription. The
following methods are supported for retrieving data from the queue:
poll
The method retrieves a message from a MessageQueue
object. If
the queue is currently empty, it will wait for up to milliSeconds.
bool poll(Message &item, int milliSeconds)
-
item: Messages to be retrieved.
-
milliSeconds: The maximum waiting time in milliseconds.
It returns a boolean value: true means the retrieval is successful; false means there are no available messages in the queue within the waiting time.
pop
The method retrieves a message from a MessageQueue
object. If
the queue is currently empty, it will wait indefinitely until a message is
available.
void pop(Message &item)
-
item: Messages to be retrieved.
Canceling Subscription
PollingClient
offers the unsubscribe
function to
cancel an existing subscription. The function declaration for
unsubscribe
is as follows:
void unsubscribe(
string host,
int port,
string tableName,
string actionName
)
Note: The arguments passed to the unsubscribe
function must
match the arguments provided when creating the subscription through the
subscribe
call.
Usage Example
The following example uses the DolphinDB server version 2.00.10.
DolphinDB script: Create a shared stream table “shared_stream_table“.
rt = streamTable(`XOM`GS`AAPL as id, 102.1 33.4 73.6 as x)
share rt as shared_stream_table
C++ Code: Create a message queue for subscription to table “shared_stream_table“. The subscription starts from the first record.
int main(int argc, const char **argv)
{
PollingClient client(0);
MessageQueueSP queue = client.subscribe("127.0.0.1", 8848, "shared_stream_table", "action3", 0);
ThreadSP t = new Thread(new Executor([&client](){
Util::sleep(1000);
client.unsubscribe("127.0.0.1", 8848, "shared_stream_table", "action3");
}));
t->start();
Message msg;
while (queue->poll(msg, 1000) && !msg.isNull()) {
std::cout << msg->getString() << std::endl;
}
}