Streaming SQL

1. Overview

Since version 3.00.4, DolphinDB provides streaming SQL to enable continuous queries and instant updates of real-time data. Users can declare shared tables as streaming SQL tables, on which register streaming SQL queries. The system only performs calculations on newly arrived data or modified data (rather than a full recalculation), and pushes only the incremental results to the subscriber, significantly reducing computational overhead and network transmission costs to achieve low latency and real-time responsiveness.

The key features of DolphinDB streaming SQL include:

  • Support for complex SQL queries, including SELECT, WHERE, JOIN, ORDER BY, etc.
  • Incremental updates to avoid full scans, reducing computation overhead and latency.
  • Subscription-based mechanism to transmit only incremental results, reducing network overhead.

2. Architecture and Principles

2.1 Streaming SQL Architecture

DolphinDB streaming SQL architecture involves three major mechanisms:

  • Job management: Each registered streaming SQL query is an independent job with centralized management of the queries' metadata and execution states.
  • Continuous stream processing: The SQL engine runs continuously, capturing data changes in real time and feeding incremental data into the query processing pipeline.
  • Incremental Update Push: The query results use an incremental update strategy. The system compares new and previous results and sends only the changes to the subscriber when there is a substantive difference. This mechanism significantly reduces network traffic and client load, improving overall system responsiveness.

2.2 Incremental Computing Process

After a streaming SQL query is submitted, the system creates an execution plan and initializes operator states. As new data arrives, operators compute increments and generate change logs, which are passed up the execution chain to update results. Only deltas are pushed to subscribers, minimizing network and compute overhead for efficient real-time updates.

2.3 Pub-Sub Mechanism

When the streaming SQL query is registered, the system generates a result change log stream table to record the incremental changes of the query results. Subscribers receive real-time changes by subscribing to this table.

When a subscriber unsubscribes, the system stops pushing incremental updates to it and releases related resources. If all subscribers unsubscribe, the change log stream table stops updating and job resources are reclaimed.

3. Considerations

The join operation in streaming SQL requires storing the table data and intermediate results in memory, thus consuming considerable memory resources. To ensure system stability, consider the following points:

  • A single node supports at most one concurrent query that involves a join operation.
  • To maintain performance, queries involving joins of up to three tables are currently supported. Complex queries with more than three tables are not supported at this time.
  • When join conditions change, the relevant computations are re-executed.

Other than join operations, other operators (such as filtering and sorting) can be processed in parallel, supporting multiple independent query pipelines.

4. Interface Description

Function Description
declareStreamingSQLTable Declare a specified table as an the input streaming SQL table. Only declared tables can be used to register streaming SQL queries. Declaration does not affect the table's use in regular SQL operations.
getStreamingSQLStatus Query the status of streaming SQL queries, supporting retrieval of a single query or all queries. The admin can view queries from all users.
listStreamingSQLTables List all streaming SQL tables declared by the current user. The admin can view declarations from all users. The returned table contains the table name, shared status, and list of declaring users.
registerStreamingSQL Register a streaming SQL query and return a query ID, and automatically create a result change log stream table. Supports keywords such as SELECT, WHERE, JOIN (only equality joins are supported, and only for types ej, lj, rj, fj), and ORDER BY.
revokeStreamingSQL Revoke the registered streaming SQL query.
revokeStreamingSQLTable Revoke the declared streaming SQL query table. Revoke all streaming SQL queries on the table before revoking the table. Only the table declared by the current user can be revoked. Only removes the streaming SQL feature; the table and its data remain.
subscribeStreamingSQL Subscribe to the results of a specified streaming SQL query. The subscriber executes the query and maintains a shared result table that is updated in real time.
unsubscribeStreamingSQL Unsubscribe from the results of a specified streaming SQL query. The subscriber's real-time result table stops updating.

5. Examples

Streaming SQL should be enabled before use. Set the streamingSQLExecutors parameter to an integer greater than 0. Adjust the maxStreamingSQLQueriesPerTable parameter as needed. Configure dolphindb.cfg in the standalone deployment, while configure cluster.cfg in the cluster deployment.

The following scripts compute the sum of value of the same id in two tables in real-time. As the data updates, the query results are automatically incrementally refreshed.

// define keyedTables
share keyedTable(`id, 1:0, `id`value, [INT, DOUBLE]) as leftTable;
share keyedTable(`id, 1:0, `id`value, [INT, DOUBLE]) as rightTable;
go;
// declare two shared keyed tables as streaming SQL input tables
declareStreamingSQLTable(leftTable); 
declareStreamingSQLTable(rightTable);

// register streaming SQL queries
queryId = registerStreamingSQL("select id, leftTable.value + rightTable.value from leftTable left join rightTable on leftTable.id=rightTable.id");

// get the status of registered streaming SQL queries
getStreamingSQLStatus()

// subscribe to the query registered above
table = subscribeStreamingSQL(,queryId)

// insert data into the left and right tables
t = table(1 2 3 4 5 as id, 0.1 0.2 0.3 0.4 0.5 as value);
leftTable.append!(t);
t = table(1 2 3 4 5 as id, 0.1 0.2 0.3 0.4 0.5 as value)
rightTable.append!(t)

// get the latest values in the subscription results table
sleep(20)
select * from table

// insert data into the left and right tables again
t = table(2 3 6 as id, 2.0 3.0 6.0 as value);
leftTable.append!(t);
t = table(6 as id, 6.0 as value);
rightTable.append!(t);

// get the latest values in the subscription results table
sleep(20)
select * from table

// clean up the environment
unsubscribeStreamingSQL(queryId=queryId)
revokeStreamingSQL(queryId)
revokeStreamingSQLTable("leftTable")
revokeStreamingSQLTable("rightTable")