Stream Subscription

The Java API supports stream subscription, allowing the Java client to receive updates whenever there are changes in the streaming data. There are three ways for stream subscription in the Java API: ThreadedClient, ThreadPooledClient, and PollingClient.

Note:

Note: Since 2.00.15, DolphinDB has enhanced login security, restricting stream subscription functions to authenticated users only. Please ensure proper login before using these functions.

Constructor

Note:
  • As of version 2.00.9, the publisher will push streaming data to the subscriber using the connection established by the subscriber. The subscriber no longer need to provide a subscribePort(default 0).

  • For versions lower than 2.00.9, the publisher must re-establish a TCP connection for data transfer after the subscription request is submitted by the subscriber.

ThreadedClient

Each subscription creates a new data processing thread, and the user needs to set a callback function. Whenever new data comes, the corresponding subscription thread executes the callback function with the received data.

Constructor:

public ThreadedClient()
public ThreadedClient(String subscribeHost, int subscribePort)

Parameters

  • subscribeHost (optional, default ""): A String representing the subscription host IP. If the Java client machine has multiple IPs, any of the IP can be specified.
  • subscribePort (optional, default 8849): An int representing the subscription port.

ThreadPooledClient

Multiple subscriptions share a single thread pool.

Constructor:

public ThreadPooledClient()
public ThreadPooledClient(String subscribeHost, int subscribePort, int threadCount)

Parameters

  • subscribeHost (optional, default ""): A String representing the subscription host IP.
  • subscribePort (optional, default 8849): An int representing the subscription port.
  • threadCount (optional): An int representing the number of subscription threads. Its default value is the number of available processor threads Runtime.getRuntime().availableProcessors().

PollingClient

Returns a queue during subscription, then polls this queue to get data.

Constructor

public PollingClient()
public PollingClient(String subscribeHost, int subscribePort)

Parameters

  • subscribeHost (optional, default ""): A String representing the subscription host IP.
  • subscribePort (optional): An int representing the subscription port.

subscribe Method

The subscribe method is used for stream subscription in all three tools, with the same parameters.

Syntax

subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, IVector filter, int batchSize, float throttle, StreamDeserializer deserializer, String user, String password, List<String> backupSites, int resubscribeInterval, boolean subOnce)

Parameters

  • host: IP address of the publisher node.
  • port: Port number of the publisher node.
  • tableName: Name of the publisher stream table.
  • actionName (optional, default "javaStreamingApi"): Name of the subscription task.
  • handler: A user-defined function to process the subscribed data.
  • offset (optional, default -1): An integer indicating the position of the first message where the subscription begins. A message is a row of the stream table. If offset is unspecified, negative or exceeding the number of rows in the stream table, the subscription starts with the next new message. offset is relative to the first row of the stream table when it is created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
  • reconnect (optional, default false): Whether to enable reconnection.
  • filter (optional, default null): A vector of the filtering conditions. Only the rows with values of the filtering column in the vector specified by the parameter filter are published to the subscriber.
  • batchSize (optional): The number of unprocessed messages to trigger the handler. If it is positive, the handler does not process messages until the number of unprocessed messages reaches batchSize. If it is unspecified or non-positive, the handler processes incoming messages as soon as they come in.
  • throttle (optional): a float indicating the maximum waiting time (in seconds) before the handler processes the incoming messages. The default value is 1.

    Note that batchSize and throttle must be specified together.

  • deserializer (optional, default null): Deserializer for the subscribed heterogeneous stream table.
  • user (optional, default ""): Username used to connect to the server
  • password (optional, default ""): Password.
  • backupSites (optional): A String list representing the backup publisher nodes, each specified in host:port format, e.g., ["192.168.0.1:8848", "192.168.0.2:8849"].
    • If specified, failover mechanism is automatically activated. If the subscribed node (i.e., primary node) becomes unavailable due to a disconnection or failover, the API attempts reconnection repeatedly across backup nodes until the subscription is successful.
    • The stream table definition, including the name, schema, and data entries, must be identical across the primary node and all configured backup nodes to prevent potential data inconsistencies during failover.
    • If the subscribed table is a high-availability stream table, the connection will be established based on the node set of backupSites.
    • To cancel a subscription, specify the host and port of the primary node.
  • resubscribeInterval (optional, default 100): A non-negative integer indicating how long (in milliseconds) to wait before attempting to resubscribe if the API detects a disconnection.
    Note: Starting from version 3.00.2.3, the parameter resubTimeout has been renamed to resubscribeInterval.
  • subOnce (optional, default false): Whether to include the subscribed node in subsequent reconnection attempts following the node switch. Note that this parameter does not take effect for a single subscribed node when resub = true.

Subscription Options

Reconnection

If the reconnect parameter is set to true, the system will automatically resubscribe after an unexpected disconnection, depending on the cause of the disconnection:

  • If both the publisher and subscriber are in normal states but the network is interrupted, the subscriber will automatically resubscribe when the network is restored.
  • If the publisher crashes, the subscriber will keep trying to resubscribe after the publisher restarts.
    • If the publisher has enabled persistence for the stream table, the subscriber can successfully resubscribe once the publisher reads data from the disk up to the point of interruption.
    • If the publisher has not enabled persistence for the stream table, the re-subscription will fail.
  • If the subscriber crashes, it will not automatically resubscribe after restarting and users need to re-execute the subscribe function.

Examples

@Test
public void testPollingClient() {
    PollingClient client = new PollingClient(9002);
    TopicPoller poller1 = client.subscribe("192.168.0.21", 9002, "Trades", -1, true);
}

Filtering

Filtering conditions can be set with parameter filter to filter the data published to the subscriber. The filtering column is set with server function setStreamTableFilterColumn. Only the messages with the specified filtering column values will be published.

Examples

@Test
public void testEnableFilter() throws IOException {
    BasicIntVector filter = new BasicIntVector(2);
    filter.setInt(0, 1);
    filter.setInt(1, 2);
    PollingClient client = new PollingClient(9002);
    TopicPoller poller1 = client.subscribe("192.168.0.21", 9002,"Trades1","subTread1",-1,filter);
    ArrayList<IMessage> msg1 = poller1.poll(500, 10000);
}

Canceling Subscription

Use the unsubscribe method to cancel an existing subscription.

public void unsubscribe(String host, int port, String tableName, String actionName)

Examples

client.unsubscribe("192.168.0.21", 9002, "outTables", "mutiSchema");

Usage Example

First define a handler which implements the com.xxdb.streaming.client.MessageHandler interface to be passed when constructing a subscription.

public static MessageHandler MessageHandler_handler = new MessageHandler() {
    @Override
    public void doEvent(IMessage msg) {
        try {
            String script = String.format("insert into Receive values(%d,%s,%f)", Integer.parseInt(msg.getEntity(0).getString()), msg.getEntity(1).getString(), Double.valueOf(msg.getEntity(2).toString()));
            conn.run(script);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
};

For single-threaded callback using ThreadedClient:

@Test
public void testThreadedClient() throws SocketException {
    // For versions before 2.00.9, start subscription with specified port 8000
    ThreadedClient client = new ThreadedClient(8000);
    // For version 2.00.9 and later, start subscription without specifying port
    ThreadedClient client = new ThreadedClient();
    client.subscribe("192.168.0.21", 8848, "Trades", handler, -1);
}

Incoming data will be passed through the msg parameter, and processed with MyHandler.

For multi-threaded callback using ThreadPooledClient:

@Test
public void testThreadPooledClient() throws SocketException {
    // For versions before 2.00.9, start subscription with specified port 8000
    ThreadPooledClient client = new ThreadPooledClient(8000);
    // For version 2.00.9 and later, start subscription without specifying port
    ThreadPooledClient client = new ThreadPooledClient();
    client.subscribe("192.168.0.21", 9002, "Trades", handler, -1);
}

For PollingClient that regularly queries the publisher table for new data through a Java client:

@Test
public void testPollingClient() {
    // For versions before 2.00.9, start subscription with specified port 8000
    PollingClient client = new PollingClient(subscribePort);
    // For version 2.00.9 and later, start subscription without specifying port
    PollingClient client = new PollingClient();
    TopicPoller poller1 = client.subscribe("192.168.0.21", 9002, "Trades", -1);

    while (true) {
        ArrayList<IMessage> msgs = poller1.poll(1000);
        if (msgs.size() > 0) {
            BasicInt value = (BasicInt) msgs.get(0).getEntity(2);
        }
    }
}

After poller1 observes that the stream data table receives new data, it will pull the new data. When there is no new data published, the Java program will be blocked in the poller1.poll method waiting.