SQL Execution Plan

DolphinDB provides a feature for viewing the execution plan of SQL queries to help optimize query performance. This tool shows how SQL statements are executed within the system.

An execution plan details how a SQL statement is processed, including the number of partitions accessed and the execution time for each step.

This feature was introduced in versions V1.30.16 and V2.00.4.

Note: Retrieving an execution plan runs the full SQL statement. For long-running queries or those involving many partitions, consider analyzing each clause separately. Alternatively, use the sqlDS function to identify affected partitions and check the execution plan for each individually.

1. Retrieving the Execution Plan

To obtain the execution plan of a SQL query, insert [HINT_EXPLAIN] immediately after SELECT or EXEC, followed by the required query fields.

Example:

select [HINT_EXPLAIN] * from pt;

Note: Execution plans are currently not supported for UPDATE or DELETE statements.

2. Understanding the Execution Plan

DolphinDB returns execution plans in JSON format. Unlike some other databases, DolphinDB’s execution plan follows the following rules:

  • Execution order is represented from top to bottom at the same level.
  • Indentation indicates the execution order of subclauses or subprocesses.

Let’s use a simple example of querying all data from a table to walk through an execution plan.

// Create database
n = 1000000
month = take(2000.01M..2016.12M, n)
x = rand(1.0, n)
id = rand(1..9, n)
t = table(month, x, id)

db = database("dfs://valuedb", VALUE, 2000.01M..2016.12M)
pt = db.createPartitionedTable(t, `pt, `month)
pt.append!(t)

// Query
select [HINT_EXPLAIN] * from loadTable("dfs://valuedb", `pt);

This query yields the following execution plan:

{
    "measurement": "microsecond",
    "explain": {
        "from": {
            "cost": 13
        },
        "map": {
            "partitions": {
                "local": 0,
                "remote": 204
            },
            "cost": 22260,
            "detail": {
                "most": {
                    "sql": "select [114699] month,x,id from pt [partition = /valuedb/200009M/1kE]",
                    "explain": {
                        "rows": 4902,
                        "cost": 9962
                    }
                },
                "least": {
                    "sql": "select [114699] month,x,id from pt [partition = /valuedb/200701M/1kE]",
                    "explain": {
                        "rows": 4902,
                        "cost": 64
                    }
                }
            }
        },
        "merge": {
            "cost": 6163,
            "rows": 1000000,
            "detail": {
                "most": {
                    "sql": "select [114691] month,x,id from pt [partition = /valuedb/201604M/1kE]",
                    "explain": {
                        "from": {
                            "cost": 4
                        },
                        "rows": 4902,
                        "cost": 157
                    }
                },
                "least": {
                    "sql": "select [114699] month,x,id from pt [partition = /valuedb/201605M/1kE]",
                    "explain": {
                        "rows": 4901,
                        "cost": 81
                    }
                }
            }
        },
        "rows": 1000000,
        "cost": 30988
    }
}

To summarize the execution flow, from top to bottom:

  • Retrieve metadata for the pt table – took 13μs.
  • Execute parallel queries on partitions:
    • 0 partitions on local nodes and 204 partitions on remote nodes - took 22,260μs in total.
    • The most time-consuming partition: /200009M/1kE - took 9,962μs, returned 4,902 rows.
  • Merge results from all partitions – took 6,163μs. The total result contained 1 million rows. The partition with the most rows returned: /201604M/1kE, 4,902 rows.
  • Total rows returned: 1,000,000; Total execution time: 30,988μs.

2.1 Structure Overview

Execution details are generally contained within the explain sections. If the SQL includes subqueries, multiple nested explain sections may appear.

"measurement": "microsecond" indicates all time units are in microseconds.

In this example, the explain section includes four parts: from, map, merge, and total rows and cost.

2.2 The from Section

"from": {
  "cost": 13
}

The from section contains a cost field, which represents the time spent retrieving metadata for the source table, measured in microseconds. In this case, loading the table pt using loadTable() took 13μs.

2.3 The map Section

DolphinDB uses a distributed file system (DFS) to store data by partitions. When an SQL query involves multiple partitions, DolphinDB first performs partition pruning. It then distributes the query to the relevant partitions for parallel execution, and finally aggregates the results. The map section provides details on how the query tasks are dispatched to the corresponding partitions.

"map": {
    "partitions": {
        "local": 0,
        "remote": 204
    },
    "cost": 22260,
    "detail": {
        "most": {
            "sql": "select [114699] month,x,id from pt [partition = /valuedb/200009M/1kE]",
            "explain": {
                "rows": 4902,
                "cost": 9962
            }
        },
        "least": {
            "sql": "select [114699] month,x,id from pt [partition = /valuedb/200701M/1kE]",
            "explain": {
                "rows": 4902,
                "cost": 64
            }
        }
    }
  }

partitions: The partitions involved in the SQL query, including:

  • local: Number of partitions on the local data node.
  • remote: Number of partitions on remote data nodes.

By comparing the number of involved partitions to the total number of partitions, you can evaluate whether query optimization techniques, such as partition pruning, have been applied.

cost: The total execution time—from dispatching the query to each partition to receiving the results.

detail: The detailed execution plans for sub-queries on each partition.

Since a query may involve many partitions, only the partitions with the longest and shortest execution times are shown.

  • most: Refers to the partition with the longest execution time.
    • The SQL field displays the query used and the partition(s) involved.
    • A nested explain section presents the query plan for each partition.
  • least: Refers to the partition with the shortest execution time.
    • The structure and content are the same as in most.

2.4 The merge Section

After individual partitions complete execution, results are sent to the coordinating node for aggregation. This process is described in the merge section:

        "merge": {
            "cost": 6163,
            "rows": 1000000,
            "detail": {
                "most": {
                    "sql": "select [114691] month,x,id from pt [partition = /valuedb/201604M/1kE]",
                    "explain": {
                        "from": {
                            "cost": 4
                        },
                        "rows": 4902,
                        "cost": 157
                    }
                },
                "least": {
                    "sql": "select [114699] month,x,id from pt [partition = /valuedb/201605M/1kE]",
                    "explain": {
                        "rows": 4901,
                        "cost": 81
                    }
                }
            }
        },
  • cost: Total time taken to perform the merge.
  • rows: Total number of rows after aggregation.
  • detail: Similar to the map section, but here:
    • most indicates the partition that returned the highest number of rows.
    • least indicates the partition that returned the fewest rows.

      Unlike the map section, these are based on row count rather than execution time.

2.5 Final Statistics

The final statistics typically include rows (the total number of records retrieved) and cost (the total query time, measured in microseconds).

In addition, rows and cost are also reported within the execution plan to reflect performance metrics at specific stages of the query.

It's important to note that the total query time shown at the end may exceed the sum of individual stage times. This discrepancy is due to additional overhead—such as data format conversions or network transmission—that occurs between stages but is not captured in the execution plan.

3. Execution Plan Details and Optimization

3.1 The from Section

The data sources in the from section can include in-memory tables, stream tables, DFS tables, the result of table joins, or nested SQL queries.

Note: Nested SQL queries cannot include [HINT_EXPLAIN].

3.1.1 Table Joins

When the query involves table joins, the execution plan will include relevant join information.

if(existsDatabase("dfs://valuedb")) dropDatabase("dfs://valuedb");
if(existsDatabase("dfs://valuedb1")) dropDatabase("dfs://valuedb1");

// Create valuedb database
n = 1000000
month = take(2000.01M..2016.12M, n)
x = rand(1.0, n)
id = rand(1..9, n)
t1 = table(month, x, id)
db1 = database("dfs://valuedb", VALUE, 2000.01M..2016.12M)
pt1 = db1.createPartitionedTable(t1, `pt, `month).append!(t1)

// Create valuedb1 database
id = rand(1..10, n)
times = now() + 1..n
vals = take(1..20, n) + rand(1.0, n)
t2 = table(id, times, vals)
db2 = database("dfs://valuedb1", VALUE, 1..10)
pt2 = db2.createPartitionedTable(t2, `pt, `id).append!(t2)

// Join query
select [HINT_EXPLAIN] times, vals, month from lsj(loadTable("dfs://valuedb", `pt), loadTable("dfs://valuedb1", `pt), `id)

Execution plan:

{
  "measurement": "microsecond",
  "explain": {
    "from": {
      "cost": 29,
      "detail": "materialize for JOIN"
    },
    "map": {
      "reshuffle": {
        "cost": 9293
      },
      "partitions": {
        "local": 0,
        "remote": 204
      },
      "cost": 1961931,
      "detail": {
        "most": {
          "sql": "select [98305] times,vals,month from lsj(DataSource< select [65541] month,id from pt where hashBucket(id, 204) == 1 as pt >,DataSource< select [65541] vals,times,id from pt where hashBucket(id, 204) == 1 as pt >,\"id\",\"id\")",
          "explain": {
            "from": {
              "cost": 55134,
              "detail": "materialize for JOIN"
            },
            "join": {
              "cost": 1701
            },
            "rows": 110842,
            "cost": 57877
          }
        },
        "least": {...}
      }
    },
    "merge": {
      "cost": 12358,
      "rows": 1000000,
      "detail": {
        "most": {
          "sql": "select [98305] times,vals,month from lsj(DataSource< select [65541] month,id from pt where hashBucket(id, 204) == 3 as pt >,DataSource< select [65541] vals,times,id from pt where hashBucket(id, 204) == 3 as pt >,\"id\",\"id\")",
          "explain": {
            "from": {
              "cost": 29406,
              "detail": "materialize for JOIN"
            },
            "join": {
              "cost": 2535
            },
            "rows": 111496,
            "cost": 34450
          }
        },
        "least": {...}
      }
    },
    "rows": 1000000,
    "cost": 1979077
  }
}

Compared to the example in Section 2, the execution plan in this case includes an additional "detail": "materialize for JOIN" in the from section. Note that the time shown in the from section does not represent the time taken by the actual table join operation, but rather the time spent preparing the two data sources for the join. The table join itself is carried out during the map phase.

According to the statistics in the map section, the partition with the longest execution time shows a join duration of 1701μs. Based on the merge section statistics, the partition returning the largest volume of data has a join duration of 2535μs.

The reshuffle time shown in the map section refers to the time spent preparing data to be stored contiguously in memory based on the join column, in order to perform the table join. This time reflects only the preparation phase and does not include the time taken to load the data into memory. If multiple tables under the same database are joined and the join columns are also partitioning columns, the reshuffle step may be omitted.

3.1.2. Subqueries

If the SQL FROM clause contains a nested SQL query, the execution plan will display a nested breakdown of that subquery’s execution.

Example: This query groups the table by month, calculates the maximum value of x in each group, and then filters out rows where the max value is below a threshold.

select [HINT_EXPLAIN] * from (select max(x) as maxx from loadTable("dfs://valuedb",`pt) group by month ) where maxx < 0.9994

In the execution plan, we can see that the from section includes much more detailed information than previous examples, especially within detail.

sql shows the SQL statement of a subquery, and the following explain provides the execution plan for that subquery. The total execution time of the subquery is 33,571 microseconds (μs).

{
  "measurement": "microsecond",
  "explain": {
    "from": {
      "cost": 33571,
      "detail": {
        "sql": "select [98304] max(x) as maxx from loadTable(\"dfs://valuedb\", \"pt\") group by month",
        "explain": {
          "from": {
            "cost": 16
          },
          "map": {
            "partitions": {
              "local": 0,
              "remote": 204
            },
            "cost": 29744,
            "detail": {
              "most": {
                "sql": "select [114715] first(month) as month,max(x) as maxx from pt [partition = /valuedb/200412M]",
                "explain": {
                  "from": {
                    "cost": 4
                  },
                  "rows": 1,
                  "cost": 8224
                }
              },
              "least": {
                "sql": "select [114715] first(month) as month,max(x) as maxx from pt [partition = /valuedb/200112M]",
                "explain": {
                  "rows": 1,
                  "cost": 31
                }
              }
            }
          },
          "merge": {
            "cost": 1192,
            "rows": 204,
            "detail": {
              "most": {
                "sql": "select [114715] first(month) as month,max(x) as maxx from pt [partition = /valuedb/201612M]",
                "explain": {
                  "rows": 1,
                  "cost": 65
                }
              },
              "least": {
                "sql": "select [114707] first(month) as month,max(x) as maxx from pt [partition = /valuedb/200001M]",
                "explain": {
                  "from": {
                    "cost": 5
                  },
                  "rows": 1,
                  "cost": 93
                }
              }
            }
          },
          "rows": 204,
          "cost": 33571
        }
      }
    },
    "where": {
      "rows": 9,
      "cost": 13
    },
    "rows": 9,
    "cost": 33621
  }
}

3.2 The where Section

The where section typically includes only two metrics:

  • cost: the time spent on filtering the data
  • rows: the number of rows that remained after filtering

For example, in the above subquery case, only 9 rows matched the filter condition, and the filtering operation took 13μs.

    "where": {
      "rows": 9,
      "cost": 13
    },

3.3 The map Section

In the map phase of a distributed query, tasks are distributed across different nodes for execution. The execution plan displays information such as the number of partitions involved, the number of rows retrieved, the execution time, and the specific SQL statements used. If the SQL has been optimized, the map section will include an optimize section or may be entirely replaced by it.

3.3.1. Partition Pruning

For distributed queries, you can check whether partition pruning has been applied by looking at the number of partitions involved in the map section. If the number of partitions listed in the execution plan matches the total number of partitions, it means no partition pruning has occurred — the query scanned all partitions. In this case, refer to the user manual's section on partition pruning to optimize your SQL.

It is recommended to estimate the number of partitions your SQL should involve before running the query, and compare that with the number shown in the execution plan. This helps you determine whether the query is scanning only the necessary data.

In the following example, a table is partitioned on the month column, resulting in a total of 204 partitions. A query is then executed to retrieve data from November to December 2016. Ideally, this should involve only 2 partitions.

// Create database
if( existsDatabase("dfs://valuedb") ) dropDatabase("dfs://valuedb");
// Generate 1 million rows with columns: x and month; partition the data by the 'month' column.
n=1000000
month=take(2000.01M..2016.12M, n)
x=rand(1.0, n)
t=table(month, x)

db=database("dfs://valuedb", VALUE, 2000.01M..2016.12M)

pt = db.createPartitionedTable(t, `pt, `month)
pt.append!(t)

// Check how many partitions were created. There should be 204.
select count(*) from pnodeRun(getAllChunks) where dfsPath like '%valuedb/%' and type != 0 group by dfsPath

select [HINT_EXPLAIN] * from pt where 2016.11M <= month<= 2016.12M

Execution plan:

{
    "measurement": "microsecond",
    "explain": {
        "from": {
            "cost": 2
        },
        "map": {
            "partitions": {
                "local": 0,
                "remote": 204
            },
            "cost": 12966,
            "detail": {
                "most": {
                    "sql": "select [114699] month,x from pt where 2016.11M <= month <= 2016.12M [partition = /valuedb/200107M/2JCB]",
                    "explain": {
        ......[omitted]

As shown in the execution plan, all 204 partitions were scanned. This indicates that the condition 2016.11M <= month <= 2016.12M does not trigger partition pruning. To enable pruning, rewrite the condition using the BETWEEN keyword:

select [HINT_EXPLAIN] * from pt where month between 2016.11M : 2016.12M

Execution plan:

{
    "measurement": "microsecond",
    "explain": {
        "from": {
            "cost": 3
        },
        "map": {
            "partitions": {
                "local": 0,
                "remote": 2
            },
            "cost": 912,
            "detail": {
                "most": {
    ......[omitted]

3.3.2 Using Partition Columns to Reduce Query Time

When writing SQL queries, it's recommended to include partitioning columns in WHERE conditions. This helps the system limit the number of partitions it needs to scan, thereby reducing query time.

In this example, the table has two columns, datea and dateb, which contain identical values. However, only dateb is defined as the partitioning column. When querying data for a specific day using the non-partitioning column datea:

if( existsDatabase("dfs://valuedb2") ) dropDatabase("dfs://valuedb2");
n=1000000
datea=take(2000.01.01..2000.01.02, n)
dateb=take(2000.01.01..2000.01.02, n)
x=rand(1.0, n)
t=table(datea,dateb, x)
db=database("dfs://valuedb2", VALUE, 2000.01.01..2000.01.02)
pt = db.createPartitionedTable(t, `pt, `dateb)
pt.append!(t)

select [HINT_EXPLAIN] * from pt where datea = 2000.01.01;

The execution plan is as follows:

{
  "measurement": "microsecond",
  "explain": {
    "from": {
      "cost": 1
    },
    "map": {
      "partitions": {
        "local": 0,
        "remote": 2
      },
      "cost": 10010,
      "detail": {
        "most": {
          "sql": "select [114699] datea,dateb,x from pt where datea == 2000.01.01 [partition = /valuedb2/20000101/OD]",
          "explain": {
            "where": {
              "rows": 500000,
              "cost": 3973
            },
            "rows": 500000,
            "cost": 9783
          }
        },
        "least": {
          "sql": "select [114695] datea,dateb,x from pt where datea == 2000.01.01 [partition = /valuedb2/20000102/OD]",
          "explain": {
            "from": {
              "cost": 8
            },
            "where": {
              "rows": 0,
              "cost": 2392
            },
            "rows": 0,
            "cost": 2516
          }
        }
      }
    },
    "merge": {
      "cost": 2079,
      "rows": 500000,
      "detail": {
        "most": {
          "sql": "select [114699] datea,dateb,x from pt where datea == 2000.01.01 [partition = /valuedb2/20000101/OD]",
          "explain": {
            "where": {
              "rows": 500000,
              "cost": 3973
            },
            "rows": 500000,
            "cost": 9783
          }
        },
        "least": {
          "sql": "select [114695] datea,dateb,x from pt where datea == 2000.01.01 [partition = /valuedb2/20000102/OD]",
          "explain": {
            "from": {
              "cost": 8
            },
            "where": {
              "rows": 0,
              "cost": 2392
            },
            "rows": 0,
            "cost": 2516
          }
        }
      }
    },
    "rows": 500000,
    "cost": 13019
  }
}

Now, compare that to querying with the partitioning column dateb:

select [HINT_EXPLAIN] * from pt where dateb = 2000.01.01;

The execution plan is as follows:

{
  "measurement": "microsecond",
  "explain": {
    "from": {
      "cost": 1
    },
    "map": {
      "partitions": {
        "local": 0,
        "remote": 1
      },
      "cost": 1275,
      "optimize": {
        "cost": 27,
        "field": "single partition query",
        "sql": "select [98307] datea,dateb,x from pt [partition = /valuedb2/20000101/OD]",
        "explain": {
          "rows": 500000,
          "cost": 1248
        }
      }
    },
    "rows": 500000,
    "cost": 2214
  }
}

When a non-partitioning column is used in the filter condition, DolphinDB cannot optimize the query and must scan all partitions. In contrast, using a partitioning column allows DolphinDB to directly access the relevant partitions, significantly improving efficiency.

Additionally, the TSDB storage engine provides a sortColumns option when creating tables. This option acts like an index, allowing for faster access to data. It's highly recommended to include both partitioning columns and sort columns in your filtering whenever possible. This can greatly improve query performance.

3.3.3 Optimization with optimize

DolphinDB has implemented SQL optimizations for certain common use cases. For instance, in IoT scenarios where it's often necessary to retrieve the most recent data for specific devices, DolphinDB uses a new pathfinding algorithm to locate the data directly. This avoids the need to dispatch query tasks to every partition, significantly improving performance.

In the execution plan, the map section shows the details of these optimizations within optimize (introduced in version 2.00.4). For example, to retrieve the latest data for all metrics from two specific devices:

// Device/Metric/Date — Takes about 1m 23s 23ms
login(`admin,`123456);devices = 1..9;metrics = 'a'..'d';days = 2020.09.01+0..1;
// Number of records per device, per metric, per day
n = 1000000;

if(existsDatabase("dfs://svmDemo"))	dropDatabase("dfs://svmDemo")
dbName="dfs://svmDemo";tableName="sensors";
tableSchema = table(1:0,`devId`metId`times`vals,[INT,CHAR,TIMESTAMP,FLOAT]);
db1 = database("", VALUE, 2020.09.01+0..1)
db2 = database("", HASH, [INT,3])
db = database(dbName,COMPO,[db1,db2],,'TSDB')
dfsTable = db.createPartitionedTable(tableSchema,tableName,`times`devId,,`devId`metId`times)
go;
t = table(1:0,`devId`metId`times`vals,[INT,CHAR,TIMESTAMP,FLOAT]);
for(day in days){
	t.clear!();	
	for (device in devices){
		for (metric in metrics){
			take(device,n)
			tmp = table(take(device,n) as devId, take(metric,n) as metId, 
				(day.timestamp()+(1..1000000)*80) as times, rand(1.0, n) as vals);
			t.append!(tmp)
		}
	}
	loadTable(dbName,tableName).append!(t);
};go;

select [HINT_EXPLAIN] * from loadTable("dfs://svmDemo","sensors") where devId in [5,9] context by devId csort times limit -1;

The execution plan output shows the optimization in the map section:

{
    "measurement": "microsecond",
    "explain": {
        "from": {
            "cost": 16
        },
        "map": {
            "optimize": {
                "cost": 3,
                "field": "optimize for CONTEXT BY + LIMIT: serial execution."
            },
            "cost": 6063225
        },
        "rows": 2,
        "cost": 6064123
    }
} 

Here, in the optimize section, cost indicates how long the optimization took, and field describes the specific improvement. In this case, it shows optimization for the combination of CONTEXT BY + CSORT + LIMIT.

DolphinDB also includes other optimizations for certain types of aggregation:

sortKey

Only tables created with the TSDB storage engine include a sortKey, which may be applied in CONTEXT BY or GROUP BY clauses. A value of true means that the system uses the sortColumns during aggregation.

algo

algo appears in GROUP BY operations, indicating the algorithm used. DolphinDB currently supports these algorithms: "hash", "vectorize", "sort". The database engine automatically selects the most suitable algorithm based on context.

Here’s an example using a table partitioned by month. The partitioning column is “date“. The table imports stock quote data from June and July (with 40 partitions for each month, totaling 80).

Calculate the volume for each stock by stock symbol. Since the stock symbol is a sort column, the execution plan shows the sortKey value as true, indicating that the stock symbol column was used for sorting.

// Import stock data partitioned by month; CSV file format shown in the appendix
login(`admin,`123456);
dbpath = "dfs://stocks";if(existsDatabase(dbpath)) dropDatabase(dbpath);

db1=database(,HASH,[SYMBOL,40]);
db2=database(,VALUE,2020.01M..2022.01M);
db=database(dbpath,COMPO,[db1, db2],,"TSDB")

schema=extractTextSchema("YOURDIR/20200601.csv")
t=table(1:0,schema.name,schema.type)
db.createPartitionedTable(t,`quotes, `symbol`date,,`symbol`date`time)

def transDate(mutable t, diff){
	return t.replaceColumn!(`date,t.date+diff);
}
diffs = [20,60];
for(diff in diffs){
	loadTextEx(dbHandle=db,tableName=`quotes,partitionColumns=`symbol`date,filename="YOURDIR/20200601.csv",transform=transDate{,diff})
}

select [HINT_EXPLAIN] symbol from loadTable("dfs://stocks","quotes") group by symbol;

Execution plan:

{
    "measurement": "microsecond",
    "explain":{
        "from":{
            "cost":19
        },
        "map":{
            "partitions":{
                "local":0,
                "remote":80
            },
            "cost":196081,
            "detail":{
                "most":{
                    "sql":"select [114699] symbol from quotes group by symbol [partition = /stocks/Key7/202007M/gK]",
                    "explain":{
                        "from":{
                            "cost":6
                        },
                        "groupBy":{
                            "sortKey":true,
                            "cost":8727
                        },
                        "rows":276,
                        "cost":20661
                    }
                },
                "least":{
                    "sql":"select [114699] symbol from quotes group by symbol [partition = /stocks/Key8/202007M/gK]",
                    "explain":{
                        "groupBy":{
                            "sortKey":true,
                            "cost":5247
                        },
                        "rows":250,
                        "cost":10605
                    }
                }
            }
        },
        "merge":{
            "cost":2951,
            "rows":23058,
            "detail":{
                "most":{
                    "sql":"select [114699] symbol from quotes group by symbol [partition = /stocks/Key39/202007M/gK]",
                    "explain":{
                        "from":{
                            "cost":7
                        },
                        "groupBy":{
                            "sortKey":true,
                            "cost":6889
                        },
                        "rows":327,
                        "cost":13987
                    }
                },
                "least":{
                    "sql":"select [114699] symbol from quotes group by symbol [partition = /stocks/Key8/202006M/gK]",
                    "explain":{
                        "groupBy":{
                            "sortKey":true,
                            "cost":5123
                        },
                        "rows":250,
                        "cost":12908
                    }
                }
            }
        },
        "reduce":{
            "sql":"select [98307] symbol from 105c5e0300000000 group by symbol",
            "explain":{
                "groupBy":{
                    "sortKey":false,
                    "algo":"sort",
                    "cost":21147
                },
                "rows":11529,
                "cost":22139
            }
        },
        "rows":11529,
        "cost":232551
    }
}

3.4 The reduce Section

Not all SQL queries include a reduce phase. This phase typically appears when the merged results from previous steps require further processing. For example:

select [HINT_EXPLAIN] max(vals) from loadTable("dfs://svmDemo", `sensors) where devId = 1 ;

After the merge phase aggregates intermediate results, DolphinDB may need an additional step—the reduce phase—for final computation. The execution plan for this query is as follows:

{
  "measurement": "microsecond",
  "explain": {
    "from": {
      "cost": 13
    },
    "map": {
      "partitions": {
        "local": 0,
        "remote": 2
      },
      "cost": 1397982,
      "detail": {
        "most": {...},
        "least": {...}
      }
    },
    "merge": {
      "cost": 35,
      "rows": 2,
      "detail": {
        "most": {
          "sql": "select [114699] max(vals) as col_0_ from sensors where devId == 1 [partition = /svmDemo/20200902/Key1/gz]",
          "explain": {
            "rows": 1,
            "cost": 1395804
          }
        },
        "least": {...}
      }
    },
    "reduce": {
      "sql": "select [98307] ::max(col_0_) as max_vals from 105c5e0300000000",
      "explain": {
        "rows": 1,
        "cost": 32
      }
    },
    "rows": 1,
    "cost": 1398888
  }
}

In this example, after intermediate results (col_0) from different partitions are merged, the reduce phase calculates the final maximum value max(col_0). The output contains one row and takes 32 microseconds to compute.

The reduce section in an execution plan helps analyze the time consumed by specific functions. This can be useful for identifying performance bottlenecks and optimizing queries accordingly. For instance, applying filters earlier during the map phase can reduce the amount of data passed to the reduce phase, thus minimizing computation time.

Take the following query as another example:

// Continuing with previously imported stock data
select [HINT_EXPLAIN] last(askPrice1) \ first(askPrice1) - 1 
from loadTable("dfs://stocks", "quotes")  
where date >= 2020.06.21
group by symbol, segment(askLevel, false)

Execution plan:

{
    "measurement": "microsecond",
    "explain": {
        "from": {
            "cost": 17
        },
        "map": {
            "partitions": {
                "local": 0,
                "remote": 80
            },
            "cost": 1528495,
            "detail": {
                "most": {...},
                "least": {...}
            }
        },
        "merge": {
            "cost": 18100,
            "rows": 128206,
            "detail": {
                "most": {
                    "sql": "select [114691] first(askPrice1) as col_1_,last(askPrice1) as col_0_ from quotes group by symbol,segment(askLevel, 0) as segment_askLevel [partition = /stocks/Key31/202007M/gK]",
                    "explain": {
                        "from": {
                            "cost": 8
                        },
                        "groupBy": {
                            "sortKey": true,
                            "cost": 100960
                        },
                        "rows": 2686,
                        "cost": 117555
                    }
                },
                "least": {...}
            }
        },
        "reduce": {
            "sql": "select [98307] ::last(col_0_) \ ::first(col_1_) - 1 as last_askPrice1_ratio from 105c5e0300000000 group by symbol,segment_askLevel",
            "explain": {
                "groupBy": {
                    "sortKey": false,
                    "algo": "hash",
                    "cost": 47979
                },
                "rows": 64103,
                "cost": 48091
            }
        },
        "rows": 64103,
        "cost": 1602107
    }
}

3.5 Execution Plans for DolphinDB-Specific Features

DolphinDB offers several innovative features that are unique to its SQL engine, such as CGROUP BY, CONTEXT BY, PIVOT BY, and the interval function. In most cases, the execution plan will separately display the time consumed by these operations, making it easier to analyze and compare performance. For example:

t = table(`A`A`A`A`B`B`B`B as sym, 
          09:30:06 09:30:28 09:31:46 09:31:59 09:30:19 09:30:43 09:31:23 09:31:56 as time, 
          10 20 10 30 20 40 30 30 as volume, 
          10.05 10.06 10.07 10.05 20.12 20.13 20.14 20.15 as price)

select [HINT_EXPLAIN] wavg(price, volume) as wvap from t 
					 group by sym 
					 cgroup by minute(time) as minute 
					 order by sym, minute

The execution plan shows that the CGROUP BY operation took 328μs:

{
    "measurement": "microsecond",
    "explain": {
        "from": {
            "cost": 0
        },
        "cgroupBy": {
            "cost": 328
        },
        "rows": 4,
        "cost": 378
    }
}

Example with PIVOT BY:

select [HINT_EXPLAIN] rowSum(ffill(last(preClose))) from loadTable("dfs://stocks", `quotes) pivot by time, symbol

The execution plan is as follows:

{
    "measurement": "microsecond",
    "explain": {
        "from": {
            "cost": 20
        },
        "map": {...},
        "merge": {...},
        "reduce": {
            "sql": "select [98307] rowSum(ffill(::last(col_0_))) as rowSum from 105c5e0300000000 pivot by time,symbol",
            "explain": {
                "pivotBy": {
                    "cost": 4086121
                },
                "rows": 15182,
                "cost": 4086176
            }
        },
        "rows": 15182,
        "cost": 22514617
    }
}

Example with interval:

When a SQL clause includes interval, the execution plan will display a fill section, indicating the time spent filling missing intervals.

N = 3653
t = table(2011.11.01..2021.10.31 as date, 
          take(`AAPL, N) as code, 
          rand([0.0573, -0.0231, 0.0765, 0.0174, -0.0025, 0.0267, 0.0304, -0.0143, -0.0256, 0.0412, 0.0810, -0.0159, 0.0058, -0.0107, -0.0090, 0.0209, -0.0053, 0.0317, -0.0117, 0.0123], N) as rate)

select [HINT_EXPLAIN] std(rate) from t group by code, interval(date, 30d, "none")

Execution plan:

{
    "measurement": "microsecond",
    "explain": {
        "from": {
            "cost": 1
        },
        "groupBy": {
            "sortKey": false,
            "algo": "sort",
            "fill": {
                "cost": 1
            },
            "cost": 1662
        },
        "rows": 123,
        "cost": 1736
    }
}