SQL Execution Plan
DolphinDB provides a feature for viewing the execution plan of SQL queries to help optimize query performance. This tool shows how SQL statements are executed within the system.
An execution plan details how a SQL statement is processed, including the number of partitions accessed and the execution time for each step.
This feature was introduced in versions V1.30.16 and V2.00.4.
Note: Retrieving an execution plan runs the full SQL statement. For long-running queries or those involving many partitions, consider analyzing each clause separately. Alternatively, use the sqlDS function to identify affected partitions and check the execution plan for each individually.
1. Retrieving the Execution Plan
To obtain the execution plan of a SQL query, insert [HINT_EXPLAIN]
immediately after SELECT or EXEC, followed by the required query fields.
Example:
select [HINT_EXPLAIN] * from pt;
Note: Execution plans are currently not supported for UPDATE or DELETE statements.
2. Understanding the Execution Plan
DolphinDB returns execution plans in JSON format. Unlike some other databases, DolphinDB’s execution plan follows the following rules:
- Execution order is represented from top to bottom at the same level.
- Indentation indicates the execution order of subclauses or subprocesses.
Let’s use a simple example of querying all data from a table to walk through an execution plan.
// Create database
n = 1000000
month = take(2000.01M..2016.12M, n)
x = rand(1.0, n)
id = rand(1..9, n)
t = table(month, x, id)
db = database("dfs://valuedb", VALUE, 2000.01M..2016.12M)
pt = db.createPartitionedTable(t, `pt, `month)
pt.append!(t)
// Query
select [HINT_EXPLAIN] * from loadTable("dfs://valuedb", `pt);
This query yields the following execution plan:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 13
},
"map": {
"partitions": {
"local": 0,
"remote": 204
},
"cost": 22260,
"detail": {
"most": {
"sql": "select [114699] month,x,id from pt [partition = /valuedb/200009M/1kE]",
"explain": {
"rows": 4902,
"cost": 9962
}
},
"least": {
"sql": "select [114699] month,x,id from pt [partition = /valuedb/200701M/1kE]",
"explain": {
"rows": 4902,
"cost": 64
}
}
}
},
"merge": {
"cost": 6163,
"rows": 1000000,
"detail": {
"most": {
"sql": "select [114691] month,x,id from pt [partition = /valuedb/201604M/1kE]",
"explain": {
"from": {
"cost": 4
},
"rows": 4902,
"cost": 157
}
},
"least": {
"sql": "select [114699] month,x,id from pt [partition = /valuedb/201605M/1kE]",
"explain": {
"rows": 4901,
"cost": 81
}
}
}
},
"rows": 1000000,
"cost": 30988
}
}
To summarize the execution flow, from top to bottom:
- Retrieve metadata for the pt table – took 13μs.
- Execute parallel queries on partitions:
- 0 partitions on local nodes and 204 partitions on remote nodes - took 22,260μs in total.
- The most time-consuming partition:
/200009M/1kE
- took 9,962μs, returned 4,902 rows.
- Merge results from all partitions – took 6,163μs. The total result contained 1
million rows. The partition with the most rows returned:
/201604M/1kE
, 4,902 rows. - Total rows returned: 1,000,000; Total execution time: 30,988μs.
2.1 Structure Overview
Execution details are generally contained within the explain
sections. If the SQL includes subqueries, multiple nested
explain
sections may appear.
"measurement": "microsecond"
indicates all time units are in
microseconds.
In this example, the explain
section includes four parts:
from
, map
, merge
, and
total rows
and cost
.
2.2 The from
Section
"from": {
"cost": 13
}
The from
section contains a cost
field, which
represents the time spent retrieving metadata for the source table, measured in
microseconds. In this case, loading the table pt using
loadTable()
took 13μs.
2.3 The map
Section
DolphinDB uses a distributed file system (DFS) to store data by partitions. When
an SQL query involves multiple partitions, DolphinDB first performs partition
pruning. It then distributes the query to the relevant partitions for parallel
execution, and finally aggregates the results. The map
section
provides details on how the query tasks are dispatched to the corresponding
partitions.
"map": {
"partitions": {
"local": 0,
"remote": 204
},
"cost": 22260,
"detail": {
"most": {
"sql": "select [114699] month,x,id from pt [partition = /valuedb/200009M/1kE]",
"explain": {
"rows": 4902,
"cost": 9962
}
},
"least": {
"sql": "select [114699] month,x,id from pt [partition = /valuedb/200701M/1kE]",
"explain": {
"rows": 4902,
"cost": 64
}
}
}
}
partitions
: The partitions involved in the SQL query,
including:
local
: Number of partitions on the local data node.remote
: Number of partitions on remote data nodes.
By comparing the number of involved partitions to the total number of partitions, you can evaluate whether query optimization techniques, such as partition pruning, have been applied.
cost
: The total execution time—from dispatching the query to
each partition to receiving the results.
detail
: The detailed execution plans for sub-queries on each
partition.
Since a query may involve many partitions, only the partitions with the longest and shortest execution times are shown.
most
: Refers to the partition with the longest execution time.- The
SQL
field displays the query used and the partition(s) involved. - A nested
explain
section presents the query plan for each partition.
- The
least
: Refers to the partition with the shortest execution time.- The structure and content are the same as in
most
.
- The structure and content are the same as in
2.4 The merge
Section
After individual partitions complete execution, results are sent to the coordinating node for aggregation. This process is described in the merge section:
"merge": {
"cost": 6163,
"rows": 1000000,
"detail": {
"most": {
"sql": "select [114691] month,x,id from pt [partition = /valuedb/201604M/1kE]",
"explain": {
"from": {
"cost": 4
},
"rows": 4902,
"cost": 157
}
},
"least": {
"sql": "select [114699] month,x,id from pt [partition = /valuedb/201605M/1kE]",
"explain": {
"rows": 4901,
"cost": 81
}
}
}
},
cost
: Total time taken to perform the merge.rows
: Total number of rows after aggregation.detail
: Similar to themap
section, but here:most
indicates the partition that returned the highest number of rows.least
indicates the partition that returned the fewest rows.Unlike the map section, these are based on row count rather than execution time.
2.5 Final Statistics
The final statistics typically include rows
(the total number of
records retrieved) and cost
(the total query time, measured in
microseconds).
In addition, rows
and cost
are also reported
within the execution plan to reflect performance metrics at specific stages of
the query.
It's important to note that the total query time shown at the end may exceed the sum of individual stage times. This discrepancy is due to additional overhead—such as data format conversions or network transmission—that occurs between stages but is not captured in the execution plan.
3. Execution Plan Details and Optimization
3.1 The from
Section
The data sources in the from
section can include in-memory
tables, stream tables, DFS tables, the result of table joins, or nested SQL
queries.
Note: Nested SQL queries cannot include
[HINT_EXPLAIN]
.
3.1.1 Table Joins
When the query involves table joins, the execution plan will include relevant join information.
if(existsDatabase("dfs://valuedb")) dropDatabase("dfs://valuedb");
if(existsDatabase("dfs://valuedb1")) dropDatabase("dfs://valuedb1");
// Create valuedb database
n = 1000000
month = take(2000.01M..2016.12M, n)
x = rand(1.0, n)
id = rand(1..9, n)
t1 = table(month, x, id)
db1 = database("dfs://valuedb", VALUE, 2000.01M..2016.12M)
pt1 = db1.createPartitionedTable(t1, `pt, `month).append!(t1)
// Create valuedb1 database
id = rand(1..10, n)
times = now() + 1..n
vals = take(1..20, n) + rand(1.0, n)
t2 = table(id, times, vals)
db2 = database("dfs://valuedb1", VALUE, 1..10)
pt2 = db2.createPartitionedTable(t2, `pt, `id).append!(t2)
// Join query
select [HINT_EXPLAIN] times, vals, month from lsj(loadTable("dfs://valuedb", `pt), loadTable("dfs://valuedb1", `pt), `id)
Execution plan:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 29,
"detail": "materialize for JOIN"
},
"map": {
"reshuffle": {
"cost": 9293
},
"partitions": {
"local": 0,
"remote": 204
},
"cost": 1961931,
"detail": {
"most": {
"sql": "select [98305] times,vals,month from lsj(DataSource< select [65541] month,id from pt where hashBucket(id, 204) == 1 as pt >,DataSource< select [65541] vals,times,id from pt where hashBucket(id, 204) == 1 as pt >,\"id\",\"id\")",
"explain": {
"from": {
"cost": 55134,
"detail": "materialize for JOIN"
},
"join": {
"cost": 1701
},
"rows": 110842,
"cost": 57877
}
},
"least": {...}
}
},
"merge": {
"cost": 12358,
"rows": 1000000,
"detail": {
"most": {
"sql": "select [98305] times,vals,month from lsj(DataSource< select [65541] month,id from pt where hashBucket(id, 204) == 3 as pt >,DataSource< select [65541] vals,times,id from pt where hashBucket(id, 204) == 3 as pt >,\"id\",\"id\")",
"explain": {
"from": {
"cost": 29406,
"detail": "materialize for JOIN"
},
"join": {
"cost": 2535
},
"rows": 111496,
"cost": 34450
}
},
"least": {...}
}
},
"rows": 1000000,
"cost": 1979077
}
}
Compared to the example in Section 2, the execution plan in this case
includes an additional "detail": "materialize for JOIN"
in
the from
section. Note that the time shown in the
from
section does not represent the time taken by the
actual table join operation, but rather the time spent preparing the two
data sources for the join. The table join itself is carried out during the
map
phase.
According to the statistics in the map
section, the
partition with the longest execution time shows a join duration of 1701μs.
Based on the merge
section statistics, the partition
returning the largest volume of data has a join duration of 2535μs.
The reshuffle
time shown in the map
section
refers to the time spent preparing data to be stored contiguously in memory
based on the join column, in order to perform the table join. This time
reflects only the preparation phase and does not include the time taken to
load the data into memory. If multiple tables under the same database are
joined and the join columns are also partitioning columns, the
reshuffle
step may be omitted.
3.1.2. Subqueries
If the SQL FROM clause contains a nested SQL query, the execution plan will display a nested breakdown of that subquery’s execution.
Example: This query groups the table by month, calculates the maximum value of x in each group, and then filters out rows where the max value is below a threshold.
select [HINT_EXPLAIN] * from (select max(x) as maxx from loadTable("dfs://valuedb",`pt) group by month ) where maxx < 0.9994
In the execution plan, we can see that the from
section
includes much more detailed information than previous examples, especially
within detail
.
sql
shows the SQL statement of a subquery, and the following
explain
provides the execution plan for that subquery.
The total execution time of the subquery is 33,571 microseconds (μs).
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 33571,
"detail": {
"sql": "select [98304] max(x) as maxx from loadTable(\"dfs://valuedb\", \"pt\") group by month",
"explain": {
"from": {
"cost": 16
},
"map": {
"partitions": {
"local": 0,
"remote": 204
},
"cost": 29744,
"detail": {
"most": {
"sql": "select [114715] first(month) as month,max(x) as maxx from pt [partition = /valuedb/200412M]",
"explain": {
"from": {
"cost": 4
},
"rows": 1,
"cost": 8224
}
},
"least": {
"sql": "select [114715] first(month) as month,max(x) as maxx from pt [partition = /valuedb/200112M]",
"explain": {
"rows": 1,
"cost": 31
}
}
}
},
"merge": {
"cost": 1192,
"rows": 204,
"detail": {
"most": {
"sql": "select [114715] first(month) as month,max(x) as maxx from pt [partition = /valuedb/201612M]",
"explain": {
"rows": 1,
"cost": 65
}
},
"least": {
"sql": "select [114707] first(month) as month,max(x) as maxx from pt [partition = /valuedb/200001M]",
"explain": {
"from": {
"cost": 5
},
"rows": 1,
"cost": 93
}
}
}
},
"rows": 204,
"cost": 33571
}
}
},
"where": {
"rows": 9,
"cost": 13
},
"rows": 9,
"cost": 33621
}
}
3.2 The where
Section
The where
section typically includes only two metrics:
cost
: the time spent on filtering the datarows
: the number of rows that remained after filtering
For example, in the above subquery case, only 9 rows matched the filter condition, and the filtering operation took 13μs.
"where": {
"rows": 9,
"cost": 13
},
3.3 The map
Section
In the map phase of a distributed query, tasks are distributed across different
nodes for execution. The execution plan displays information such as the number
of partitions involved, the number of rows retrieved, the execution time, and
the specific SQL statements used. If the SQL has been optimized, the map section
will include an optimize
section or may be entirely replaced by
it.
3.3.1. Partition Pruning
For distributed queries, you can check whether partition pruning has been
applied by looking at the number of partitions involved in the
map
section. If the number of partitions listed in the
execution plan matches the total number of partitions, it means no partition
pruning has occurred — the query scanned all partitions. In this case, refer
to the user manual's section on partition
pruning to optimize your SQL.
It is recommended to estimate the number of partitions your SQL should involve before running the query, and compare that with the number shown in the execution plan. This helps you determine whether the query is scanning only the necessary data.
In the following example, a table is partitioned on the month column, resulting in a total of 204 partitions. A query is then executed to retrieve data from November to December 2016. Ideally, this should involve only 2 partitions.
// Create database
if( existsDatabase("dfs://valuedb") ) dropDatabase("dfs://valuedb");
// Generate 1 million rows with columns: x and month; partition the data by the 'month' column.
n=1000000
month=take(2000.01M..2016.12M, n)
x=rand(1.0, n)
t=table(month, x)
db=database("dfs://valuedb", VALUE, 2000.01M..2016.12M)
pt = db.createPartitionedTable(t, `pt, `month)
pt.append!(t)
// Check how many partitions were created. There should be 204.
select count(*) from pnodeRun(getAllChunks) where dfsPath like '%valuedb/%' and type != 0 group by dfsPath
select [HINT_EXPLAIN] * from pt where 2016.11M <= month<= 2016.12M
Execution plan:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 2
},
"map": {
"partitions": {
"local": 0,
"remote": 204
},
"cost": 12966,
"detail": {
"most": {
"sql": "select [114699] month,x from pt where 2016.11M <= month <= 2016.12M [partition = /valuedb/200107M/2JCB]",
"explain": {
......[omitted]
As shown in the execution plan, all 204 partitions were scanned. This
indicates that the condition 2016.11M <= month <=
2016.12M
does not trigger partition pruning. To enable
pruning, rewrite the condition using the BETWEEN keyword:
select [HINT_EXPLAIN] * from pt where month between 2016.11M : 2016.12M
Execution plan:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 3
},
"map": {
"partitions": {
"local": 0,
"remote": 2
},
"cost": 912,
"detail": {
"most": {
......[omitted]
3.3.2 Using Partition Columns to Reduce Query Time
When writing SQL queries, it's recommended to include partitioning columns in WHERE conditions. This helps the system limit the number of partitions it needs to scan, thereby reducing query time.
In this example, the table has two columns, datea and dateb, which contain identical values. However, only dateb is defined as the partitioning column. When querying data for a specific day using the non-partitioning column datea:
if( existsDatabase("dfs://valuedb2") ) dropDatabase("dfs://valuedb2");
n=1000000
datea=take(2000.01.01..2000.01.02, n)
dateb=take(2000.01.01..2000.01.02, n)
x=rand(1.0, n)
t=table(datea,dateb, x)
db=database("dfs://valuedb2", VALUE, 2000.01.01..2000.01.02)
pt = db.createPartitionedTable(t, `pt, `dateb)
pt.append!(t)
select [HINT_EXPLAIN] * from pt where datea = 2000.01.01;
The execution plan is as follows:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 1
},
"map": {
"partitions": {
"local": 0,
"remote": 2
},
"cost": 10010,
"detail": {
"most": {
"sql": "select [114699] datea,dateb,x from pt where datea == 2000.01.01 [partition = /valuedb2/20000101/OD]",
"explain": {
"where": {
"rows": 500000,
"cost": 3973
},
"rows": 500000,
"cost": 9783
}
},
"least": {
"sql": "select [114695] datea,dateb,x from pt where datea == 2000.01.01 [partition = /valuedb2/20000102/OD]",
"explain": {
"from": {
"cost": 8
},
"where": {
"rows": 0,
"cost": 2392
},
"rows": 0,
"cost": 2516
}
}
}
},
"merge": {
"cost": 2079,
"rows": 500000,
"detail": {
"most": {
"sql": "select [114699] datea,dateb,x from pt where datea == 2000.01.01 [partition = /valuedb2/20000101/OD]",
"explain": {
"where": {
"rows": 500000,
"cost": 3973
},
"rows": 500000,
"cost": 9783
}
},
"least": {
"sql": "select [114695] datea,dateb,x from pt where datea == 2000.01.01 [partition = /valuedb2/20000102/OD]",
"explain": {
"from": {
"cost": 8
},
"where": {
"rows": 0,
"cost": 2392
},
"rows": 0,
"cost": 2516
}
}
}
},
"rows": 500000,
"cost": 13019
}
}
Now, compare that to querying with the partitioning column dateb:
select [HINT_EXPLAIN] * from pt where dateb = 2000.01.01;
The execution plan is as follows:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 1
},
"map": {
"partitions": {
"local": 0,
"remote": 1
},
"cost": 1275,
"optimize": {
"cost": 27,
"field": "single partition query",
"sql": "select [98307] datea,dateb,x from pt [partition = /valuedb2/20000101/OD]",
"explain": {
"rows": 500000,
"cost": 1248
}
}
},
"rows": 500000,
"cost": 2214
}
}
When a non-partitioning column is used in the filter condition, DolphinDB cannot optimize the query and must scan all partitions. In contrast, using a partitioning column allows DolphinDB to directly access the relevant partitions, significantly improving efficiency.
Additionally, the TSDB storage engine provides a sortColumns option when creating tables. This option acts like an index, allowing for faster access to data. It's highly recommended to include both partitioning columns and sort columns in your filtering whenever possible. This can greatly improve query performance.
3.3.3 Optimization with optimize
DolphinDB has implemented SQL optimizations for certain common use cases. For instance, in IoT scenarios where it's often necessary to retrieve the most recent data for specific devices, DolphinDB uses a new pathfinding algorithm to locate the data directly. This avoids the need to dispatch query tasks to every partition, significantly improving performance.
In the execution plan, the map
section shows the details of
these optimizations within optimize
(introduced in version
2.00.4). For example, to retrieve the latest data for all metrics from two
specific devices:
// Device/Metric/Date — Takes about 1m 23s 23ms
login(`admin,`123456);devices = 1..9;metrics = 'a'..'d';days = 2020.09.01+0..1;
// Number of records per device, per metric, per day
n = 1000000;
if(existsDatabase("dfs://svmDemo")) dropDatabase("dfs://svmDemo")
dbName="dfs://svmDemo";tableName="sensors";
tableSchema = table(1:0,`devId`metId`times`vals,[INT,CHAR,TIMESTAMP,FLOAT]);
db1 = database("", VALUE, 2020.09.01+0..1)
db2 = database("", HASH, [INT,3])
db = database(dbName,COMPO,[db1,db2],,'TSDB')
dfsTable = db.createPartitionedTable(tableSchema,tableName,`times`devId,,`devId`metId`times)
go;
t = table(1:0,`devId`metId`times`vals,[INT,CHAR,TIMESTAMP,FLOAT]);
for(day in days){
t.clear!();
for (device in devices){
for (metric in metrics){
take(device,n)
tmp = table(take(device,n) as devId, take(metric,n) as metId,
(day.timestamp()+(1..1000000)*80) as times, rand(1.0, n) as vals);
t.append!(tmp)
}
}
loadTable(dbName,tableName).append!(t);
};go;
select [HINT_EXPLAIN] * from loadTable("dfs://svmDemo","sensors") where devId in [5,9] context by devId csort times limit -1;
The execution plan output shows the optimization in the map
section:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 16
},
"map": {
"optimize": {
"cost": 3,
"field": "optimize for CONTEXT BY + LIMIT: serial execution."
},
"cost": 6063225
},
"rows": 2,
"cost": 6064123
}
}
Here, in the optimize
section, cost
indicates how long the optimization took, and field
describes the specific improvement. In this case, it shows optimization for
the combination of CONTEXT BY + CSORT + LIMIT.
DolphinDB also includes other optimizations for certain types of aggregation:
sortKey
Only tables created with the TSDB storage engine include a
sortKey
, which may be applied in CONTEXT BY or GROUP BY
clauses. A value of true
means that the system uses the
sortColumns during aggregation.
algo
algo
appears in GROUP BY operations, indicating the
algorithm used. DolphinDB currently supports these algorithms:
"hash"
, "vectorize"
,
"sort"
. The database engine automatically selects the
most suitable algorithm based on context.
Here’s an example using a table partitioned by month. The partitioning column is “date“. The table imports stock quote data from June and July (with 40 partitions for each month, totaling 80).
Calculate the volume for each stock by stock symbol. Since the stock symbol
is a sort column, the execution plan shows the sortKey
value as true, indicating that the stock symbol column was used for
sorting.
// Import stock data partitioned by month; CSV file format shown in the appendix
login(`admin,`123456);
dbpath = "dfs://stocks";if(existsDatabase(dbpath)) dropDatabase(dbpath);
db1=database(,HASH,[SYMBOL,40]);
db2=database(,VALUE,2020.01M..2022.01M);
db=database(dbpath,COMPO,[db1, db2],,"TSDB")
schema=extractTextSchema("YOURDIR/20200601.csv")
t=table(1:0,schema.name,schema.type)
db.createPartitionedTable(t,`quotes, `symbol`date,,`symbol`date`time)
def transDate(mutable t, diff){
return t.replaceColumn!(`date,t.date+diff);
}
diffs = [20,60];
for(diff in diffs){
loadTextEx(dbHandle=db,tableName=`quotes,partitionColumns=`symbol`date,filename="YOURDIR/20200601.csv",transform=transDate{,diff})
}
select [HINT_EXPLAIN] symbol from loadTable("dfs://stocks","quotes") group by symbol;
Execution plan:
{
"measurement": "microsecond",
"explain":{
"from":{
"cost":19
},
"map":{
"partitions":{
"local":0,
"remote":80
},
"cost":196081,
"detail":{
"most":{
"sql":"select [114699] symbol from quotes group by symbol [partition = /stocks/Key7/202007M/gK]",
"explain":{
"from":{
"cost":6
},
"groupBy":{
"sortKey":true,
"cost":8727
},
"rows":276,
"cost":20661
}
},
"least":{
"sql":"select [114699] symbol from quotes group by symbol [partition = /stocks/Key8/202007M/gK]",
"explain":{
"groupBy":{
"sortKey":true,
"cost":5247
},
"rows":250,
"cost":10605
}
}
}
},
"merge":{
"cost":2951,
"rows":23058,
"detail":{
"most":{
"sql":"select [114699] symbol from quotes group by symbol [partition = /stocks/Key39/202007M/gK]",
"explain":{
"from":{
"cost":7
},
"groupBy":{
"sortKey":true,
"cost":6889
},
"rows":327,
"cost":13987
}
},
"least":{
"sql":"select [114699] symbol from quotes group by symbol [partition = /stocks/Key8/202006M/gK]",
"explain":{
"groupBy":{
"sortKey":true,
"cost":5123
},
"rows":250,
"cost":12908
}
}
}
},
"reduce":{
"sql":"select [98307] symbol from 105c5e0300000000 group by symbol",
"explain":{
"groupBy":{
"sortKey":false,
"algo":"sort",
"cost":21147
},
"rows":11529,
"cost":22139
}
},
"rows":11529,
"cost":232551
}
}
3.4 The reduce
Section
Not all SQL queries include a reduce phase. This phase typically appears when the merged results from previous steps require further processing. For example:
select [HINT_EXPLAIN] max(vals) from loadTable("dfs://svmDemo", `sensors) where devId = 1 ;
After the merge phase aggregates intermediate results, DolphinDB may need an additional step—the reduce phase—for final computation. The execution plan for this query is as follows:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 13
},
"map": {
"partitions": {
"local": 0,
"remote": 2
},
"cost": 1397982,
"detail": {
"most": {...},
"least": {...}
}
},
"merge": {
"cost": 35,
"rows": 2,
"detail": {
"most": {
"sql": "select [114699] max(vals) as col_0_ from sensors where devId == 1 [partition = /svmDemo/20200902/Key1/gz]",
"explain": {
"rows": 1,
"cost": 1395804
}
},
"least": {...}
}
},
"reduce": {
"sql": "select [98307] ::max(col_0_) as max_vals from 105c5e0300000000",
"explain": {
"rows": 1,
"cost": 32
}
},
"rows": 1,
"cost": 1398888
}
}
In this example, after intermediate results (col_0
) from
different partitions are merged, the reduce phase calculates the final maximum
value max(col_0)
. The output contains one row and takes 32
microseconds to compute.
The reduce
section in an execution plan helps analyze the time
consumed by specific functions. This can be useful for identifying performance
bottlenecks and optimizing queries accordingly. For instance, applying filters
earlier during the map phase can reduce the amount of data passed to the reduce
phase, thus minimizing computation time.
Take the following query as another example:
// Continuing with previously imported stock data
select [HINT_EXPLAIN] last(askPrice1) \ first(askPrice1) - 1
from loadTable("dfs://stocks", "quotes")
where date >= 2020.06.21
group by symbol, segment(askLevel, false)
Execution plan:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 17
},
"map": {
"partitions": {
"local": 0,
"remote": 80
},
"cost": 1528495,
"detail": {
"most": {...},
"least": {...}
}
},
"merge": {
"cost": 18100,
"rows": 128206,
"detail": {
"most": {
"sql": "select [114691] first(askPrice1) as col_1_,last(askPrice1) as col_0_ from quotes group by symbol,segment(askLevel, 0) as segment_askLevel [partition = /stocks/Key31/202007M/gK]",
"explain": {
"from": {
"cost": 8
},
"groupBy": {
"sortKey": true,
"cost": 100960
},
"rows": 2686,
"cost": 117555
}
},
"least": {...}
}
},
"reduce": {
"sql": "select [98307] ::last(col_0_) \ ::first(col_1_) - 1 as last_askPrice1_ratio from 105c5e0300000000 group by symbol,segment_askLevel",
"explain": {
"groupBy": {
"sortKey": false,
"algo": "hash",
"cost": 47979
},
"rows": 64103,
"cost": 48091
}
},
"rows": 64103,
"cost": 1602107
}
}
3.5 Execution Plans for DolphinDB-Specific Features
DolphinDB offers several innovative features that are unique to its SQL engine,
such as CGROUP BY, CONTEXT BY, PIVOT BY, and the interval
function. In most cases, the execution plan will separately display the time
consumed by these operations, making it easier to analyze and compare
performance. For example:
t = table(`A`A`A`A`B`B`B`B as sym,
09:30:06 09:30:28 09:31:46 09:31:59 09:30:19 09:30:43 09:31:23 09:31:56 as time,
10 20 10 30 20 40 30 30 as volume,
10.05 10.06 10.07 10.05 20.12 20.13 20.14 20.15 as price)
select [HINT_EXPLAIN] wavg(price, volume) as wvap from t
group by sym
cgroup by minute(time) as minute
order by sym, minute
The execution plan shows that the CGROUP BY operation took 328μs:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 0
},
"cgroupBy": {
"cost": 328
},
"rows": 4,
"cost": 378
}
}
Example with PIVOT BY:
select [HINT_EXPLAIN] rowSum(ffill(last(preClose))) from loadTable("dfs://stocks", `quotes) pivot by time, symbol
The execution plan is as follows:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 20
},
"map": {...},
"merge": {...},
"reduce": {
"sql": "select [98307] rowSum(ffill(::last(col_0_))) as rowSum from 105c5e0300000000 pivot by time,symbol",
"explain": {
"pivotBy": {
"cost": 4086121
},
"rows": 15182,
"cost": 4086176
}
},
"rows": 15182,
"cost": 22514617
}
}
Example with interval
:
When a SQL clause includes interval
, the execution plan will
display a fill
section, indicating the time spent filling
missing intervals.
N = 3653
t = table(2011.11.01..2021.10.31 as date,
take(`AAPL, N) as code,
rand([0.0573, -0.0231, 0.0765, 0.0174, -0.0025, 0.0267, 0.0304, -0.0143, -0.0256, 0.0412, 0.0810, -0.0159, 0.0058, -0.0107, -0.0090, 0.0209, -0.0053, 0.0317, -0.0117, 0.0123], N) as rate)
select [HINT_EXPLAIN] std(rate) from t group by code, interval(date, 30d, "none")
Execution plan:
{
"measurement": "microsecond",
"explain": {
"from": {
"cost": 1
},
"groupBy": {
"sortKey": false,
"algo": "sort",
"fill": {
"cost": 1
},
"cost": 1662
},
"rows": 123,
"cost": 1736
}
}