Stream Subscription
The Java API supports stream subscription, allowing the Java client to receive updates
whenever there are changes in the streaming data. There are three ways for stream
subscription in the Java API: ThreadedClient
,
ThreadPooledClient
, and PollingClient
.
Constructor
-
As of version 2.00.9, the publisher will push streaming data to the subscriber using the connection established by the subscriber. The subscriber no longer need to provide a subscribePort(default 0).
-
For versions lower than 2.00.9, the publisher must re-establish a TCP connection for data transfer after the subscription request is submitted by the subscriber.
ThreadedClient
Each subscription creates a new data processing thread, and the user needs to set a callback function. Whenever new data comes, the corresponding subscription thread executes the callback function with the received data.
Constructor:
public ThreadedClient() public ThreadedClient(String subscribeHost, int subscribePort)
Parameters
- subscribeHost (optional, default ""): A String representing the subscription host IP. If the Java client machine has multiple IPs, any of the IP can be specified.
- subscribePort (optional, default 8849): An int representing the subscription port.
ThreadPooledClient
Multiple subscriptions share a single thread pool.
Constructor:
public ThreadPooledClient() public ThreadPooledClient(String subscribeHost, int subscribePort, int threadCount)
Parameters
- subscribeHost (optional, default ""): A String representing the subscription host IP.
- subscribePort (optional, default 8849): An int representing the subscription port.
- threadCount (optional): An int representing the number of
subscription threads. Its default value is the number of available processor
threads
Runtime.getRuntime().availableProcessors()
.
PollingClient
Returns a queue during subscription, then polls this queue to get data.
Constructor
public PollingClient()
public PollingClient(String subscribeHost, int subscribePort)
Parameters
- subscribeHost (optional, default ""): A String representing the subscription host IP.
- subscribePort (optional): An int representing the subscription port.
subscribe
Method
The subscribe
method is used for stream subscription in all three
tools, with the same parameters.
Syntax
subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, IVector filter, int batchSize, float throttle, StreamDeserializer deserializer, String user, String password, List<String> backupSites, int resubTimeout, boolean subOnce)
Parameters
- host: IP address of the publisher node.
- port: Port number of the publisher node.
- tableName: Name of the publisher stream table.
- actionName (optional, default "javaStreamingApi"): Name of the subscription task.
- handler: A user-defined function to process the subscribed data.
- offset (optional, default -1): An integer indicating the position of the first message where the subscription begins. A message is a row of the stream table. If offset is unspecified, negative or exceeding the number of rows in the stream table, the subscription starts with the next new message. offset is relative to the first row of the stream table when it is created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
- reconnect (optional, default false): Whether to enable reconnection.
- filter (optional, default null): A vector of the filtering conditions. Only the rows with values of the filtering column in the vector specified by the parameter filter are published to the subscriber.
- batchSize (optional): The number of unprocessed messages to trigger the handler. If it is positive, the handler does not process messages until the number of unprocessed messages reaches batchSize. If it is unspecified or non-positive, the handler processes incoming messages as soon as they come in.
- throttle (optional): a float indicating the maximum waiting time (in
seconds) before the handler processes the incoming messages. The default value
is 1.
Note that batchSize and throttle must be specified together.
- deserializer (optional, default null): Deserializer for the subscribed heterogeneous stream table.
- user (optional, default ""): Username used to connect to the server
- password (optional, default ""): Password.
- backupSites (optional): A String list representing the backup publisher
nodes, each specified in host:port format, e.g., ["192.168.0.1:8848",
"192.168.0.2:8849"].
- If specified, failover mechanism is automatically activated. If the subscribed node (i.e., primary node) becomes unavailable due to a disconnection or failover, the API attempts reconnection repeatedly across backup nodes until the subscription is successful.
- The stream table definition, including the name, schema, and data entries, must be identical across the primary node and all configured backup nodes to prevent potential data inconsistencies during failover.
- If the subscribed table is a high-availability stream table, the connection will be established based on the node set of backupSites.
- To cancel a subscription, specify the host and port of the primary node.
- resubTimeout (optional, default 100): A non-negative integer indicating how long (in milliseconds) to wait before attempting to resubscribe if the API detects a disconnection.
- subOnce (optional, default false): Whether to include the subscribed node in subsequent reconnection attempts following the node switch.
Subscription Options
Reconnection
If the reconnect parameter is set to true, the system will automatically resubscribe after an unexpected disconnection, depending on the cause of the disconnection:
- If both the publisher and subscriber are in normal states but the network is interrupted, the subscriber will automatically resubscribe when the network is restored.
- If the publisher crashes, the subscriber will keep trying to resubscribe
after the publisher restarts.
- If the publisher has enabled persistence for the stream table, the subscriber can successfully resubscribe once the publisher reads data from the disk up to the point of interruption.
- If the publisher has not enabled persistence for the stream table, the re-subscription will fail.
- If the subscriber crashes, it will not automatically resubscribe after
restarting and users need to re-execute the
subscribe
function.
Examples
@Test public void testPollingClient() { PollingClient client = new PollingClient(9002); TopicPoller poller1 = client.subscribe("192.168.0.21", 9002, "Trades", -1, true); }
Filtering
Filtering conditions can be set with parameter filter to filter the data
published to the subscriber. The filtering column is set with server function
setStreamTableFilterColumn
. Only the messages with the
specified filtering column values will be published.
Examples
@Test public void testEnableFilter() throws IOException { BasicIntVector filter = new BasicIntVector(2); filter.setInt(0, 1); filter.setInt(1, 2); PollingClient client = new PollingClient(9002); TopicPoller poller1 = client.subscribe("192.168.0.21", 9002,"Trades1","subTread1",-1,filter); ArrayList<IMessage> msg1 = poller1.poll(500, 10000); }
Canceling Subscription
Use the unsubscribe
method to cancel an existing subscription.
public void unsubscribe(String host, int port, String tableName, String actionName)
Examples
client.unsubscribe("192.168.0.21", 9002, "outTables", "mutiSchema");
Usage Example
First define a handler which implements the
com.xxdb.streaming.client.MessageHandler
interface to be passed
when constructing a subscription.
public static MessageHandler MessageHandler_handler = new MessageHandler() { @Override public void doEvent(IMessage msg) { try { String script = String.format("insert into Receive values(%d,%s,%f)", Integer.parseInt(msg.getEntity(0).getString()), msg.getEntity(1).getString(), Double.valueOf(msg.getEntity(2).toString())); conn.run(script); } catch (IOException e) { e.printStackTrace(); } } };
For single-threaded callback using ThreadedClient
:
@Test public void testThreadedClient() throws SocketException { // For versions before 2.00.9, start subscription with specified port 8000 ThreadedClient client = new ThreadedClient(8000); // For version 2.00.9 and later, start subscription without specifying port ThreadedClient client = new ThreadedClient(); client.subscribe("192.168.0.21", 8848, "Trades", handler, -1); }
Incoming data will be passed through the msg parameter, and processed with
MyHandler
.
For multi-threaded callback using ThreadPooledClient
:
@Test public void testThreadPooledClient() throws SocketException { // For versions before 2.00.9, start subscription with specified port 8000 ThreadPooledClient client = new ThreadPooledClient(8000); // For version 2.00.9 and later, start subscription without specifying port ThreadPooledClient client = new ThreadPooledClient(); client.subscribe("192.168.0.21", 9002, "Trades", handler, -1); }
For PollingClient
that regularly queries the publisher table for new
data through a Java client:
@Test public void testPollingClient() { // For versions before 2.00.9, start subscription with specified port 8000 PollingClient client = new PollingClient(subscribePort); // For version 2.00.9 and later, start subscription without specifying port PollingClient client = new PollingClient(); TopicPoller poller1 = client.subscribe("192.168.0.21", 9002, "Trades", -1); while (true) { ArrayList<IMessage> msgs = poller1.poll(1000); if (msgs.size() > 0) { BasicInt value = (BasicInt) msgs.get(0).getEntity(2); } } }
After poller1 observes that the stream data table receives new data, it will pull the
new data. When there is no new data published, the Java program will be blocked in
the poller1.poll
method waiting.