流式 SQL
1. 概述
自 3.00.4 版本起,DolphinDB 提供了流式 SQL 功能,通过增量计算和订阅计算结果,实现对实时数据的持续查询和即时更新。用户可以将共享内存表声明为流式 SQL 表,在这些表上注册流式 SQL 查询。系统只会对新到达或发生变更的数据进行计算(而非全量重算),并且仅将增量结果推送给订阅端,从而显著降低计算开销与网络传输成本,实现低延迟、实时响应。
DolphinDB 流式 SQL 核心功能包括:
- 支持复杂 SQL 查询,涵盖 SELECT、WHERE、JOIN 和 ORDER BY 等常用操作。
- 通过增量更新避免全量扫描,降低计算开销和延迟。
- 通过订阅机制仅传输增量结果,减少网络开销。
2. 系统架构与实现原理
2.1 架构
DolphinDB 流式 SQL 的架构围绕三大机制展开:
- 作业管理:将每条注册的流式 SQL 查询视为一个独立作业(job)。统一管理其元数据和执行状态。
- 持续流式处理:SQL 引擎持续运行,实时捕获数据变更,推动增量数据进入查询处理流水线。
- 增量更新推送:查询结果采用增量更新策略,系统会对比新旧结果的差异,仅当结果发生实质性变化时,将变更部分(delta)发送给订阅客户端。此机制显著减少网络传输和客户端负载,提高整体系统响应效率。
2.2 增量计算流程
用户提交流式 SQL 查询后,系统生成相应的执行计划,并初始化各算子的中间状态。当新的增量数据到达时,算子基于这些数据执行增量计算,生成相应的变更日志。
这些变更日志沿查询执行链路自下而上传递,逐步更新最终的查询结果。系统仅将实际发生变化的结果差异(delta)推送给客户端订阅者,有效减少网络传输和计算资源消耗,实现高效且实时的结果更新。
2.3 订阅发布机制
流式 SQL 查询注册时,系统会为该查询自动生成一个变更日志流表,用于记录该查询结果的增量变化。订阅端通过订阅该变更日志流表,获取实时的结果变更。
当客户端取消订阅时,系统自动停止向该订阅端推送增量变更,释放相关资源。若所有订阅端取消订阅,则变更日志流表会停止更新,作业资源得到回收。
3. 使用限制
流式 SQL 中的连接操作需要在内存中保存参与连接的表数据和中间结果,因此会占用较多内存资源。为了保证系统稳定性,目前存在以下限制:
- 单个节点最多支持同时处理一个包含连接操作的查询。
- 为保证性能,目前仅支持最多连接三张表的查询,超过三表连接的复杂查询暂不支持。
- 当连接条件发生变化时,相关计算会重新执行。
除了连接操作,其他算子(如过滤和排序)可以并行处理,支持多个独立的查询流水线。
4. 接口函数说明
函数名 | 函数介绍 |
---|---|
declareStreamingSQLTable | 声明指定表为流式 SQL 输入表,只有被声明的表才能注册流式 SQL 查询。声明不会影响该表在普通 SQL 中的使用。 |
getStreamingSQLStatus | 查询流式 SQL 查询状态,支持查询单条或所有查询。管理员可查看所有用户查询。 |
listStreamingSQLTables | 列举当前用户声明的所有流式 SQL 表,管理员可查看所有用户声明。返回表包含表名、共享状态及声明用户列表。 |
registerStreamingSQL | 注册流式 SQL 查询,返回查询 ID,并自动生成结果变更日志流表。支持 SELECT、WHERE、JOIN(仅支持等值连接,且仅支持 ej、lj、rj、fj 类型)、ORDER BY 等关键字。 |
revokeStreamingSQL | 注销已注册的流式 SQL 查询。 |
revokeStreamingSQLTable | 注销之前声明的流式 SQL 表。注销前须先取消该表上的所有流式 SQL 查询订阅。只能注销当前用户声明的表。注销仅移除流式 SQL 功能,不删除表或数据。 |
subscribeStreamingSQL | 订阅指定流式 SQL 查询结果,订阅端执行查询并维护实时更新的共享结果表。 |
unsubscribeStreamingSQL | 取消订阅指定流式 SQL 查询结果,订阅端停止更新结果表。 |
5. 使用示例
使用流式 SQL 前,需先启用该功能。在配置文件中,将 streamingSQLExecutors 设置为大于 0 的整数。根据实际情况调整 maxStreamingSQLQueriesPerTable 参数。单节点环境修改 dolphindb.cfg,集群环境修改 cluster.cfg。
以下代码实时计算两个表中同一 id 的 value 值之和,随着数据更新,查询结果自动增量刷新。
// define keyedTables
share keyedTable(`id, 1:0, `id`value, [INT, DOUBLE]) as leftTable;
share keyedTable(`id, 1:0, `id`value, [INT, DOUBLE]) as rightTable;
go;
// 将两个共享键值内存表声明为流式 SQL 输入表。
declareStreamingSQLTable(leftTable);
declareStreamingSQLTable(rightTable);
// 注册流式 SQL 查询
queryId = registerStreamingSQL("select id, leftTable.value + rightTable.value from leftTable left join rightTable on leftTable.id=rightTable.id");
// 查看已注册的流式 SQL 查询运行状态
getStreamingSQLStatus()
// 订阅上述注册的 SQL 查询
table = subscribeStreamingSQL(,queryId)
// 向左、右表中插入数据
t = table(1 2 3 4 5 as id, 0.1 0.2 0.3 0.4 0.5 as value);
leftTable.append!(t);
t = table(1 2 3 4 5 as id, 0.1 0.2 0.3 0.4 0.5 as value)
rightTable.append!(t)
// 查看订阅结果表的最新值
sleep(20)
select * from table
// 再次向左、右表中插入数据
t = table(2 3 6 as id, 2.0 3.0 6.0 as value);
leftTable.append!(t);
t = table(6 as id, 6.0 as value);
rightTable.append!(t);
// 查看订阅结果表的最新值
sleep(20)
select * from table
// 清理环境
unsubscribeStreamingSQL(queryId=queryId)
revokeStreamingSQL(queryId)
revokeStreamingSQLTable("leftTable")
revokeStreamingSQLTable("rightTable")
6. 未来规划
为了持续提升流式 SQL 的性能与功能,DolphinDB 将重点推进以下工作:
- 优化查询执行计划,提升整体计算效率。
- 增强执行引擎的调度能力,实现更高效的流数据处理。
- 改进各类算子的增量计算逻辑。
- 支持和加速流式子查询的执行。
- 实现算子资源共享,减少重复计算。