流式 SQL

1. 概述

自 3.00.4 版本起,DolphinDB 提供了流式 SQL 功能,通过增量计算和订阅计算结果,实现对实时数据的持续查询和即时更新。用户可以将共享内存表声明为流式 SQL 表,在这些表上注册流式 SQL 查询。系统只会对新到达或发生变更的数据进行计算(而非全量重算),并且仅将增量结果推送给订阅端,从而显著降低计算开销与网络传输成本,实现低延迟、实时响应。

DolphinDB 流式 SQL 核心功能包括:

  • 支持复杂 SQL 查询,涵盖 SELECT、WHERE、JOIN 和 ORDER BY 等常用操作。
  • 通过增量更新避免全量扫描,降低计算开销和延迟。
  • 通过订阅机制仅传输增量结果,减少网络开销。

2. 系统架构与实现原理

2.1 架构

DolphinDB 流式 SQL 的架构围绕三大机制展开:

  • 作业管理:将每条注册的流式 SQL 查询视为一个独立作业(job)。统一管理其元数据和执行状态。
  • 持续流式处理:SQL 引擎持续运行,实时捕获数据变更,推动增量数据进入查询处理流水线。
  • 增量更新推送:查询结果采用增量更新策略,系统会对比新旧结果的差异,仅当结果发生实质性变化时,将变更部分(delta)发送给订阅客户端。此机制显著减少网络传输和客户端负载,提高整体系统响应效率。

2.2 增量计算流程

用户提交流式 SQL 查询后,系统生成相应的执行计划,并初始化各算子的中间状态。当新的增量数据到达时,算子基于这些数据执行增量计算,生成相应的变更日志。

这些变更日志沿查询执行链路自下而上传递,逐步更新最终的查询结果。系统仅将实际发生变化的结果差异(delta)推送给客户端订阅者,有效减少网络传输和计算资源消耗,实现高效且实时的结果更新。

2.3 订阅发布机制

流式 SQL 查询注册时,系统会为该查询自动生成一个变更日志流表,用于记录该查询结果的增量变化。订阅端通过订阅该变更日志流表,获取实时的结果变更。

当客户端取消订阅时,系统自动停止向该订阅端推送增量变更,释放相关资源。若所有订阅端取消订阅,则变更日志流表会停止更新,作业资源得到回收。

3. 使用限制

流式 SQL 中的连接操作需要在内存中保存参与连接的表数据和中间结果,因此会占用较多内存资源。为了保证系统稳定性,目前存在以下限制:

  • 单个节点最多支持同时处理一个包含连接操作的查询。
  • 为保证性能,目前仅支持最多连接三张表的查询,超过三表连接的复杂查询暂不支持。
  • 当连接条件发生变化时,相关计算会重新执行。

除了连接操作,其他算子(如过滤和排序)可以并行处理,支持多个独立的查询流水线。

4. 接口函数说明

函数名 函数介绍
declareStreamingSQLTable 声明指定表为流式 SQL 输入表,只有被声明的表才能注册流式 SQL 查询。声明不会影响该表在普通 SQL 中的使用。
getStreamingSQLStatus 查询流式 SQL 查询状态,支持查询单条或所有查询。管理员可查看所有用户查询。
listStreamingSQLTables 列举当前用户声明的所有流式 SQL 表,管理员可查看所有用户声明。返回表包含表名、共享状态及声明用户列表。
registerStreamingSQL 注册流式 SQL 查询,返回查询 ID,并自动生成结果变更日志流表。支持 SELECT、WHERE、JOIN(仅支持等值连接,且仅支持 ej、lj、rj、fj 类型)、ORDER BY 等关键字。
revokeStreamingSQL 注销已注册的流式 SQL 查询。
revokeStreamingSQLTable 注销之前声明的流式 SQL 表。注销前须先取消该表上的所有流式 SQL 查询订阅。只能注销当前用户声明的表。注销仅移除流式 SQL 功能,不删除表或数据。
subscribeStreamingSQL 订阅指定流式 SQL 查询结果,订阅端执行查询并维护实时更新的共享结果表。
unsubscribeStreamingSQL 取消订阅指定流式 SQL 查询结果,订阅端停止更新结果表。

5. 使用示例

使用流式 SQL 前,需先启用该功能。在配置文件中,将 streamingSQLExecutors 设置为大于 0 的整数。根据实际情况调整 maxStreamingSQLQueriesPerTable 参数。单节点环境修改 dolphindb.cfg,集群环境修改 cluster.cfg。

以下代码实时计算两个表中同一 id 的 value 值之和,随着数据更新,查询结果自动增量刷新。

// define keyedTables
share keyedTable(`id, 1:0, `id`value, [INT, DOUBLE]) as leftTable;
share keyedTable(`id, 1:0, `id`value, [INT, DOUBLE]) as rightTable;
go;
// 将两个共享键值内存表声明为流式 SQL 输入表。
declareStreamingSQLTable(leftTable); 
declareStreamingSQLTable(rightTable);

// 注册流式 SQL 查询
queryId = registerStreamingSQL("select id, leftTable.value + rightTable.value from leftTable left join rightTable on leftTable.id=rightTable.id");

// 查看已注册的流式 SQL 查询运行状态
getStreamingSQLStatus()

// 订阅上述注册的 SQL 查询
table = subscribeStreamingSQL(,queryId)

// 向左、右表中插入数据
t = table(1 2 3 4 5 as id, 0.1 0.2 0.3 0.4 0.5 as value);
leftTable.append!(t);
t = table(1 2 3 4 5 as id, 0.1 0.2 0.3 0.4 0.5 as value)
rightTable.append!(t)

// 查看订阅结果表的最新值
sleep(20)
select * from table

// 再次向左、右表中插入数据
t = table(2 3 6 as id, 2.0 3.0 6.0 as value);
leftTable.append!(t);
t = table(6 as id, 6.0 as value);
rightTable.append!(t);

// 查看订阅结果表的最新值
sleep(20)
select * from table

// 清理环境
unsubscribeStreamingSQL(queryId=queryId)
revokeStreamingSQL(queryId)
revokeStreamingSQLTable("leftTable")
revokeStreamingSQLTable("rightTable")

6. 未来规划

为了持续提升流式 SQL 的性能与功能,DolphinDB 将重点推进以下工作:

  • 优化查询执行计划,提升整体计算效率。
  • 增强执行引擎的调度能力,实现更高效的流数据处理。
  • 改进各类算子的增量计算逻辑。
  • 支持和加速流式子查询的执行。
  • 实现算子资源共享,减少重复计算。