StreamingSQLClient
The Java API provides StreamingSQLClient for operating on Streaming SQL.Users can use it to declare Streaming SQL tables and register Streaming SQL queries to obtain result tables.StreamingSQLClient subscribes to the change logs and maintains a locally updated result table based on these incremental logs.
StreamingSQLClient
Create a client for operating on Streaming
SQL.
public StreamingSQLClient(String host, int port, String userName, String password)
Parameters
- host: A string indicating the IP address of the node with Streaming SQL enabled.
- port: An integer indicating the port number of the node with Streaming SQL enabled.
- userName: A string indicating the username for logging into the node with Streaming SQL enabled.
- password: A string indicating the password for logging into the node with Streaming SQL enabled.
declareStreamingSQLTable
Declare the specified table as a Streaming SQL input table, indicating that Streaming
SQL queries can be registered and executed on
it.
public void declareStreamingSQLTable(String tableName)
Parameters
- tableName: A string indicating the name of the Streaming SQL table. Currently, only shared in-memory tables are supported.
revokeStreamingSQLTable
Revoke a Streaming SQL table that was declared with
declareStreamingSQLTable
.
- Only the table declared by the current user can be revoked.
- Before revoking a table, all streaming SQL queries registered on it must be revoked first.
- This function only removes the streaming SQL feature; the table and its data remain.
public void revokeStreamingSQLTable(String tableName)
Parameters
- tableName: A string indicating the name of the declared Streaming SQL table.
listStreamingSQLTables
List all streaming SQL tables declared via
declareStreamingSQLTable
.
Return streaming SQL tables of all users in the system if called by the
admin.public BasicTable listStreamingSQLTables()
Return a table containing the following columns: tableName, shared, users.
registerStreamingSQL
Register a streaming SQL query and return a unique queryId. Then the system generates
a shared stream table with the same name as queryId to write result change
logs.
public String registerStreamingSQL(String sqlQuery, String queryId, int logTableCacheSize)
Parameters
- sqlQuery A string representing the query for registering streaming SQL.
- queryId (optional) A string representing the ID name for query.
Must follow variable naming rules: letters, numbers, or underscores only, and
start with a letter.
- If the specified queryId already exists, the system automatically appends a timestamp to create a unique ID.
- If not specified, the system automatically generates a unique ID.
- logTableCacheSize (optional) A positive integer indicating the maximum number of result change logs cached in memory. All logs are cached by default. Data in the cache that has not been published is retained as long as there are subscriptions.
revokeStreamingSQL
Revoke a streaming SQL query registered via
registerStreamingSQL
.
Only for the queries registered by the current
user.public void revokeStreamingSQL(String queryId)
Parameters
- queryId (optional) A string representing the ID name for the registered streaming SQL query.
getStreamingSQLStatus
Get the status of streaming SQL
query.
public BasicTable getStreamingSQLStatus(String queryId)
Return a table containing the following columns:
- queryId: The ID name for the registered streaming SQL query.
- user: Name of the user who register the query.
- registerTime: The time of query registration.
- status: The status of the current query, which can be:
- SQL_REGISTERED: Registered but not running.
- SQL_RUNNING: Running normally and results are updated in real time.
- SQL_STOPPED: Stopped.
- INTERNAL_ERROR: Error.
- sqlQuery: The statement of streaming SQL query.
- involvedTables: Tables involved in the query.
- lastErrorMessage: The last error message, if any.
Parameters
- queryId (optional) A STRING scalar representing the ID name for the registered streaming SQL query.
subscribeStreamingSQL
Subscribe to the results of a specified streaming SQL query. The subscriber receives
incremental logs and uses them to maintain a shared result table that is updated in
real time, ensuring the query results stay continuously refreshed as data
changes.
public BasicTable subscribeStreamingSQL(String queryId, int batchSize, float throttle)
Prarmeters
- queryId A string representing the ID name for the streaming SQL query to subscribe.
- batchSize (optional) An integer.
- Positive integer: Process incremental logs only when the unprocessed number reaches batchSize.
- Non-positive or unspecified: Process each batch of incremental logs as they arrive.
- throttle (optional) A floating specifying the maximum interval after the
last log processing. If batchSize is not met within this period, logs are
processed again.
- If batchSize is not specified, throttle has no effect even if set.
- To set throttle to less than 1 second, modify the configuration parameter subThrottle first.
unsubscribeStreamingSQL
Unsubscribe from the results of a specified streaming SQL query. After unsubscribing,
the subscriber's real-time result table stops
updating.
public void unsubscribeStreamingSQL(String queryId)
Parameters
- queryId A string representing the ID name for the streaming SQL query to unsubscribe.
Examples
Note: To use this feature, enable Streaming SQL on the server first. The related
configuration parameters are streamingSQLExecutors and
maxStreamingSQLQueriesPerTable. For details, refer to the User
Manual.
DBConnection conn = new DBConnection();
conn.connect(HOST, PORT, "admin", "123456");
// Define keyed tables
String script = "share keyedTable(`id, 1:0, `id`time`value, [SYMBOL, TIMESTAMP, DOUBLE[]]) as t1;\n" +
"share keyedTable(`id, 1:0, `id`time`value, [SYMBOL, TIMESTAMP, DOUBLE[]]) as t2;";
conn.run(script);
// Create a StreamingSQLClient
StreamingSQLClient streamingSQLClient = new StreamingSQLClient(HOST, PORT, "admin","123456");
// Declare t1 and t2 as streaming SQL tables
streamingSQLClient.declareStreamingSQLTable("t1");
streamingSQLClient.declareStreamingSQLTable("t2");
// List declared streaming SQL tables
BasicTable streamingSQLTables = streamingSQLClient.listStreamingSQLTables();
// Register streaming SQL
String sqlStr1 = "SELECT id, time,t1.value, rowSum(value) FROM t1 FULL JOIN t2 ON t1.id=t2.id\n" ;
String queryId = streamingSQLClient.registerStreamingSQL(sqlStr1);
// List the status of a specified declared streaming SQL
BasicTable streamingSQLStatus = streamingSQLClient.getStreamingSQLStatus(queryId);
// Subscribe to the streaming SQL
BasicTable bt = streamingSQLClient.subscribeStreamingSQL(queryId);
// Write data to declared streaming SQL tables
conn.run("n=100;\n" +
"data = table(take(\"A\"+string(1..30), n) as id, timestamp(2025.08.26T12:36:23.438+1..n) as timestamp, take([take(1..10,10),take(1..10,10),take(1..10,10)], n) as value)\n" +
"t1.append!(data)\n");
conn.run("n=100;\n" +
"data = table(take(\"A\"+string(1..30), n) as id, timestamp(2025.08.26T12:36:23.438+1..n) as timestamp, take([take(1..10,10),take(1..10,10),take(1..10,10)], n) as value)\n" +
"t2.append!(data)\n");
Thread.sleep(5000);
// Perform business operations on the local result table bt as needed
System.out.println(bt.getString());
// Unsubscribe the streaming SQL
streamingSQLClient.unsubscribeStreamingSQL(queryId);