ThreadedClient

ThreadedClient creates a dedicated thread for each subscription. Whenever new data is generated on the publisher, the corresponding subscription thread retrieves the data and invokes the designated callback function (handler).

This section presents a comprehensive overview of the ThreadedClient, covering its constructor, subscription and unsubscription mechanisms, and provides a practical usage example.

Constructing a ThreadedClient

The constructor declaration is as follows:

ThreadedClient(int listeningPort = 0)
  • listeningPort: The subscription port on the client to subscribe to the data sent from the server.

DolphinDB server version 2.00.9 and later enable the publisher to push data through the existing connection requested by the subscriber. Therefore, the subscriber does not need to specify a specific port (filled in 0). If listeningPort is specified, it will be ignored by the API.

Starting from version 3.00.2.1, a new constructor for threaded clients is available:

ThreadedClient(StreamingClientConfig config);

where StreamingClientConfig is the configuration for streaming subscription. The default setup is defined as follows:

enum class TransportationProtocol {
    TCP, UDP,
};
struct StreamingClientConfig {
    TransportationProtocol protocol{TransportationProtocol::TCP};
};
  • protocol: Defines the transport protocol for the client.
    • TCP: Functions identically to previous versions.
    • UDP: Enables UDP multicast communication with the DolphinDB server via the Aeron library.

Note (for Using UDP Protocol):

  • The current UDP communication method only supports the first subscribe interface (which processes each message as it arrives) and does not support parameters resub, filter, backupSites, resubTimeout, and subOnce.
  • The C++ API requires the Aeron library (of any version) for compilation on Linux only.
  • The API operates with an embedded Aeron Media Driver, so users do not need to run a separate Aeron thread.
  • When using UDP for streaming subscriptions, the C++ API creates a folder named dolphindb_udp_<pid> in the /dev/shm directory for Aeron’s use. This folder is automatically deleted when the subscription ends. If the client process terminates unexpectedly, delete the relevant folder manually.
  • The DolphinDB server must be Linux 3.00.0 or later.
  • The maximum message size that the server can publish at one time is 2 MB. If data is not received during subscription, check if this limit has been exceeded. Adjusting the maxMsgNumPerBlock configuration parameter may resolve this issue.

Creating Subscription

ThreadedClient offers the subscribe function to establish subscriptions to DolphinDB stream tables. Concerning the handling of incoming messages, two types of interfaces are provided.

(1) Processing each message as it arrives individually. Accessing the Message object must be completed within the handler.

using MessageHandler = std::function<void(Message)>;
ThreadSP subscribe(
    string host, int port,
    const MessageHandler &handler,
    string tableName,
    string actionName = DEFAULT_ACTION_NAME,
    int64_t offset = -1,
    bool resub = true,
    const VectorSP &filter = nullptr,
    bool msgAsTable = false,
    bool allowExists = false, 
    string userName = "",
    string password = "",
    const StreamDeserializerSP &blobDeserializer = nullptr,
    const std::vector<std::string>& backupSites = std::vector<std::string>(),
    int resubscribeInterval = 100,
    bool subOnce = false,    
    int resubscribeTimeout = 0
)

(2) Processing messages in batches using a caching mechanism. The handler is triggered only when specified conditions (batchSizeand throttle) are met.

using MessageBatchHandler = std::function<void(vector<Message>)>;
ThreadSP subscribe(
    string host, int port,
    const MessageBatchHandler &handler,
    string tableName,
    string actionName = DEFAULT_ACTION_NAME,
    int64_t offset = -1,
    bool resub = true,
    const VectorSP &filter = nullptr,
    bool allowExists = false,
    int batchSize = 1,
    double throttle = 1,
    bool msgAsTable = false,
    string userName = "",
    string password = "",
    const StreamDeserializerSP &blobDeserializer = nullptr,
    const std::vector<std::string>& backupSites = std::vector<std::string>(),
    int resubscribeInterval = 100,
    bool subOnce = false,    
    int resubscribeTimeout = 0
)

Arguments

  • host: The IP address of the publisher node.
  • port: The port number of the publisher node.
  • handler: A user-defined function to process the received data.
  • tableName: The name of the subscribed stream table.
  • actionName: The name of the subscription task. It can contain letters, digits, and underlines.
  • offset: The position of the first message where the subscription begins. A message is a row of the stream table. Offset is relative to the first row of the subscribed stream table when it was created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts. Note that if offset is unspecified, negative numbers, or exceeds the row count, the subscription starts with the next new message.
  • resub: Whether to re-subscribe after a network disconnection.
  • filter: The filtering conditions. Only the messages with the specified filtering column values will be published.
  • msgAsTable: true means the subscribed data is ingested into handler as a Table. false means the subscribed data is ingested into handler as an AnyVector. msgAsTable only takes effect when batchSize is specified.
  • allowExists: true means multiple handlers can be specified. false means only a single handler can be specified.
  • batchSize: The number of unprocessed messages to trigger the handler.
    • If batchSize is a positive integer, the handler does not process messages until the number of unprocessed messages reaches batchSize. The handler processes batchSize messages at a time.
    • If it is unspecified or non-positive, the handler processes incoming messages one by one as soon as they come in.
  • throttle: The maximum waiting time (in seconds) before the handler processes the incoming messages. throttle takes no effect if batchSize is not specified.
  • userName:The username used to connect to the server.
  • password: The password used to connect to the server.
  • blobDeserializer: The deserializer for the subscribed heterogeneous stream table.
  • backupSites: A backup node list with each specified in host:port format, e.g., ["192.168.0.1:8848", "192.168.0.2:8849"].
    • If specified, failover mechanism is automatically activated. If the subscribed node (i.e., primary node) becomes unavailable due to a disconnection or failover, the API attempts reconnection repeatedly across backup nodes until the subscription is successful.
    • The stream table definition, including the name, schema, and data entries, must be identical across the primary node and all configured backup nodes to prevent potential data inconsistencies during failover.
    • If the subscribed table is a high-availability stream table, the connection will be established based on the node set of backupSites.
    • To cancel a subscription, specify the host and port of the primary node.
  • resubscribeInterval: A non-negative integer indicating the wait time (in milliseconds) between resubscription attempts when a disconnection is detected. The default value is 100.
  • subOnce: Whether to include the subscribed node in subsequent reconnection attempts following the node switch. Note that this parameter does not take effect for a single subscribed node when resub = true.
  • resubscribeTimeout: An integer specifying the timeout (in milliseconds) allowed for resubscription attempts. The default value is 0, which means no time limit. Recommended to use with resub=true to prevent excessive resource consumption from prolonged resubscription attempts.

Return Values

A pointer to the thread that continuously invokes handler. This thread will stop calling handler when function unsubscribe is called on the same topic.

Canceling Subscription

ThreadedClient offers the unsubscribe function to cancel an existing subscription. The function declaration for unsubscribe is as follows:

void unsubscribe(
    string host,
    int port,
    string tableName,
    string actionName
)

Note: The arguments passed to the unsubscribe function must match the arguments provided when creating the subscription through the subscribe call.

Usage Example

The following example uses the DolphinDB server version 2.00.10.

DolphinDB script: Create a shared stream table “shared_stream_table“.

rt = streamTable(`XOM`GS`AAPL as id, 102.1 33.4 73.6 as x)
share rt as shared_stream_table

C++ Code: Create a subscription to table “shared_stream_table“, starting from the first record. Cancel this subscription after 10 seconds. During this 10-second window, the subscriber will receive any data appended to the "shared_stream_table".

#include <iostream>
#include "Streaming.h"
using namespace dolphindb;

int main(int argc, const char **argv)
{
    auto batchHandler = [](std::vector<Message> msgs){
        std::cout << "receive " << msgs.size() << " msgs\n";
        for(auto& msg : msgs){
            std::cout << msg->getString() << std::endl;
        }
    };
    ThreadedClient client(0);
    ThreadSP t = client.subscribe("127.0.0.1", 8848, batchHandler, "shared_stream_table", "action1", 0, true, nullptr, false, 3, 1.0);
    sleep(10);
    client.unsubscribe("127.0.0.1", 8848, "shared_stream_table", "action1");
    return 0;
}