[HINT_EXPLAIN]

To monitor the real-time performance and execution order of a SQL query, you can print the query execution plan by adding "[HINT_EXPLAIN]" after the SQL keywords "select" or "exec". For example:

select [HINT_EXPLAIN] * from tb where id > 20

Note:

  • A SQL query with "[HINT_EXPLAIN]" returns a JSON string indicating the execution plan instead of the actual query result.

  • Currently, "[HINT_EXPLAIN]" does not work with "update" and "delete" statements.

"[HINT_EXPLAIN]" can be used in queries on both partitioned and non-partitioned tables. For a query on a partitioned table, the execution plan contains extra information about the map-reduce operation. This tutorial uses queries on a DFS table as examples to explain the attributes in the execution plan.

The outermost level of the JSON string consists of 2 root attributes, measurement and explain.

measurement

"measurement":"microsecond"

This indicates that the elapsed time (i.e., the values of the cost attribute) in the execution plan is measured in microseconds.

explain

The order of the sub attributes inside explain, from top to bottom, represents the execution order of the SQL query. Since a query may contain subqueries, explain can also be nested inside other attributes.

explain contains the following sub attributes:

  • rows: the number of records returned

  • cost: the time it takes to execute the query

{
  "measurement":"microsecond",
  "explain": {
      /* other attribute-value pairs */
      "rows": 20000,
      "cost": 100
  }
}

The execution of a distributed query has 3 phases: map → merge → reduce. The attributes corresponding to the 3 phases are usually nested inside the root attribute explain. For example:

{
  "measurement":"microsecond",
  "explain":{
      "from":{...},
      "map":{...},
      "merge":{...},
      "reduce":{...},
      "rows":10,
      "cost":23530
  }
}

The system optimizes queries where possible and the associated information is included in the "optimize" attribute in the execution plan. For example, in standalone mode, create a DFS table partitioned on the "date" column. Query all the records in a single partition:

select [HINT_EXPLAIN] * from t1 where date = 2022.01.01

The query is optimized and an "optimize" attribute is included in the execution plan.

{
  "measurement":"microsecond",
  "explain":{
      "from":{ ... },
      "optimize":{ ... },
      "rows":185,
      "cost":987
  }
}

from

from indicates the execution plan of the parsed "from" clause.

Depending on what follows the "from" keyword in the query, the from attribute may contain different sub attributes. Below are some of the possible scenarios:

(1) "from" is followed by a table object. For example, select [HINT_EXPLAIN] * from pt

"from": {
  "cost": 3  // Cost of the "from" clause
}

(2) The "from" clause contains a nested clause. For example, select [HINT_EXPLAIN] * from (select max(x) as maxx from loadTable("dfs://valuedb",`pt) group by month ) where maxx < 0.9994

"from": {
  "cost": 33571,  // Cost of the "from" clause
  "detail": {
  "sql": "select [98304] max(x) as maxx from loadTable("dfs://valuedb", "pt") group by month",  // The nested clause
  "explain": { ... } // "explain" for the nested clause
  }
}

In this example, there's a nested explain inside from showing the execution plan of the "select" subquery.

(3) For version 2.00.12 and earlier, when "from" is followed by a join function, the "detail" attribute is returned, indicating whether or not the clause involves a table join operation. For example, select [HINT_EXPLAIN] * from lsj(pt1, pt2, "date")

"from": {
"cost": 3,  // Cost of the "from" clause
"detail": "materialize for JOIN" // Indicates that the clause involves a table join
}
Since version 2.00.12, the from attribute will not be returned for a join function following the "from" clause. Instead, the materialization attribute is introduced. The left and right sub attributes are added to display the subqueries for accessing source tables, i.e., leftTable and rightTable parameters of join functions. For example, select [HINT_EXPLAIN] * from ej(t1, select * from pt2, "id").
"materialization":{
    "cost": 37,                   
    "left": "pt1",                   // leftTable
    "right": "select * from pt2"  // rightTable
}

Note: At this stage, the tables are not yet joined. cost only indicates the cost to load the source tables "pt1" and "pt2".

JOIN

Starting from version 2.00.12, HINT_EXPLAIN supports displaying the detailed execution plan for joins, covering various table joiners: cross join, inner join, left join, left semi join, right join, and full join.

Note: The following cases DO NOT follow the new execution plan:

  • Any of the tables involved in the join are partitioned in-memory tables;

  • Joins using join functions with matchingCols set as partitioning columns;

  • Joins using cj function, excluding nested cj used as the leftTable in a query (without PIVOT BY). For example, select * from cj(select * from cj(select * from cj(t1, t2), t3), select * from t4) follows the new execution plan. follows the new execution plan.

For the process of retrieving data from the source table, the following detailed info will be displayed for each "left" and "right" table involved in the join.
"sql": "...",      // subquery for retrieving data
                              // returns "<Empty Segment>" if there is no corresponding partition
"rows": 10,                   // the number of rows obtained
"cost": 237,                  // cost to retrieve data
"resultPartitionInfo": {
    "partitionParamsSet": true,             // whether the join attempts to return a partitioned table. If true, the following info is returned.
    "forcePartition": true,                 // whether to forcibly generate a partitioned table
    "domainType": "NULL/VALUE/HASH/...",    // partition type. If no such partitions, returns NULL.
    "domainScheme": [1, 2, 3],              // partitioning scheme for non-null "domainType"
    "partitionColumns": ["col1", "col2"],   // the partitioning columns
    "useSeqPartition": false                // whether the returned table is SEQ-partitioned
    "numForcedPartitions": -1,              // when "domainType"=NULL and "useSeqPartition"=false, it returns the default partition numbers for HASH-partitioned table
                                            // -1 indicates the estimated partition number
},
"explain": {...}             // execution plan of "sql"

At this phase, the number of join stages are listed at the beginning, which contains:

  • numPipelinedStages: the number of pipelined joins;

  • numSerialStages: the number of serial joins.

"numPipelinedStages": 2, 
"numSerialStages": 3,              
Since version 2.00.10, the distributed joins involving common partitions from the left and right tables will be optimized to be executed in a pipeline (pipelined joins). The execution plan for such joins will be demonstrated in the pipelinedStages attribute, which contains the following sub attributes.
"pipelinedStages": {
    "rows": 10,                  // the returned rows                
    "cost": 35,                  // the cost for pipelined joins
    "numTasks": 10,              // the number of pipeline tasks
    "most": {                    // the most time-consuming task
        "cost": 20,              // the cost for the most time-consuming task
        "segments": [            // the partition path for each partitioned table
            "20210101/Key1",     // returns "" for an in-memory table and "<Empty Segment>" for a partition that has been pruned
            ""
        ],
        "explain": {
            "stage1": {
                "left": {...},   // execution plan for retrieving data from left table
                "right": {...},  // execution plan for retrieving data from right table
                "join": {
                    "script": "ej(t1, t2, `id, `id)",     //script for the join operation
                    "predicates": [                       // predicates pushed down to this join
                        "t1.id > 3",
                        "t2.id < 5"
                    ],
                    "rows": 10,   // the returned rows for "stage1"
                    "cost": 152   // the cost of "stage1"                
                }
            },
            "stage2": {
                "right": {...},   // execution plan for retrieving data from right table
                "join": {...}     // same format as the "join" in "stage1"
            },
            ...                   // other possible stages
        }
    },
    "least": {...},               // the least time-consuming task, same format as the "most" attribute
    "resultPartitionInfo": {                   // info of the returned intermediate table
        "isPartitioned": true,                 // whether the table is partitioned. If true, the following info is returned.
        "estimatedRows": 50000,                // estimated rows of the table 
        "domainType": "NULL/VALUE/HASH/...",   // partition type. If no such partitions, returns NULL.
        "domainScheme": [1, 2, 3],             // partitioning scheme for non-null "domainType"
        "partitionColumns": ["col1", "col2"],  // the partitioning columns
        "useSeqPartition": false               // whether the returned table is SEQ-partitioned
    "totalExecutionCost": 20,                  // the total cost for pipeline tasks  
    "totalAppendCost": 30,                     // the cost for appending results to the partitioned tables
    
    // If all joins can be executed in a pipeline, a final query will be executed.
    "final": {
        "sql": "...",       
        "rows": 10,         
        "cost": 237,       
        "explain": {...}            // execution plan for the final query
    }
},
For join operations that cannot be executed in a pipelined manner, they will be run in an unoptimized way (serial joins). The execution plan for these joins will be displayed in the serialStages attribute, which contains the following sub attributes:
"serialStages": {
    "rows": 10,               // the returned rows           
    "cost": 30,               // the cost for serial joins          
    "stage3": {               // the stage numbers for serial joins follow sequentially after the pipelined stage
        "left": {...},        // execution plan for retrieving data from left table
                              // returns when the join operation involves only serial joins
        "right": {...},       // execution plan for retrieving data from right table
        "join": {
            "sql": "...",              
            "rows": 10,                 
            "cost": 237,                
            "resultPartitionInfo": {                    // info of the returned intermediate table
                "partitionParamsSet": true,             // whether the join attempts to return a partitioned table. If true, the following info is returned.
                                                        // returns true for all joins other than the last join operation
                "forcePartition": true,                 // whether to forcibly generate a partitioned table 
                "domainType": "NULL/VALUE/HASH/...",    // partition type. If no such partitions, returns NULL.
                "domainScheme": [1, 2, 3],              // partitioning scheme for non-null "domainType"
                "partitionColumns": ["col1", "col2"],   // the partitioning columns
                "useSeqPartition": false                // whether the returned table is SEQ-partitioned
                "numForcedPartitions": -1,              // when "domainType"=NULL and "useSeqPartition"=false, it returns the default partition numbers for HASH-partitioned table                                                    
            },                                          // -1 indicates the estimated partition number
            "explain": {...}  // execution plan for "sql"
        }
    },
    "stage4": {
        "right": {...},       // execution plan for retrieving data from right table
        "join": {...}         // same format as the "join" in "stage3"
    },
    ...                       // other possible stages
},

where

For the TSDB engine, the required data will be filtered based on the index of the level file using the where condition. where may contain the following sub attributes:

  • TSDBIndexPrefiltering: Indicates the info of data and where condition during the prefiltering process.

    • blocksToBeScanned: The number of blocks to be scanned after the prefiltering.

    • matchedWhereConditions: The number of matched conditions in the WHERE clause that involve the sort keys, including equal and non-equal comparisons (excluding chained comparison), or the use of operators such as IN, BETWEEN, LIKE.

  • rows: The number of rows that satisfy the where condition.

  • cost: The cost of the WHERE clause.

"where": {
    "TSDBIndexPrefiltering": {
        "blocksToBeScanned": 0,
        "matchedWhereConditions": 1
    },
    "rows": 0,
    "cost": 416
}

map

In the map phase, the system identifies all the partitions (on local and remote nodes) involved in the query, then generates the corresponding subqueries and distributes the subqueries to all the partitions involved for parallel execution.

The map attribute may contain the following sub attributes:

  • generate_reshuffle_query: This attribute is unique to distributed joins, containing information about the "generate_reshuffle_query" operation. The operation takes place before the distributed join to redistribute data and store them continuously in memory based on the join column(s). If the join column(s) match the partitioning column(s), this operation is not performed. (Note that distributed joins are only available in 2.00 or higher version.)

  • partitions: Information about the partitions involved in this query. The sub attributes local and remote indicate the number of partitions on the local/remote nodes.

  • cost: The cost of the entire map phase. By the end of this phase, all subqueries are completed.

  • detail: The execution details of subqueries in the map phase:

    a. most: The information of the most time-consuming subquery.

    b. least: The information of the least time-consuming subquery.

"map": {
  "generate_reshuffle_query": {
      "cost": 2
  },
  "partitions": {
      "local": 10,
      "remote": 5,
  },
  "cost": 100,
  "detail": {
      "most": {
      "sql": "select time,id,value from pt [partition = /iot/1]",
      "explain": { ... }
      },
      "least": {
      "sql": "select time,id,value from pt [partition = /iot/4]",
      "explain": { ... },
      }
  }
}

optimize

optimize indicates how the query is optimized.

  • optimize: Query optimization information. It contains the following sub attributes:

  • field: Optimized clauses, such as "where", "join" and "group"; or optimization scenarios, such as "single partition query".

"optimize": {
  "cost": 3,
  "field": ["where", "join", "group"],
  "sql": "..."  // the optimized query
}

Below are some scenarios where the queries are optimized:

(1) When querying only a single partition, the query's execution plan of the map phase is as follows:

"map": {
  "partitions": {
      "local": 1,   // Or "0"
      "remote": 0,   // Or "1"
  },
  "cost": 100,
  "optimize": {
      "field": "single partition query",
      "sql": "...",
      "explain": { ... }
  }
}

As the query only involves one partition on a single node, merge is not required.

(2) context by + csort + limit

In this example, the query uses "context by" in conjunction with "csort" and "limit":

select * from pt where device in ["a", "b"] context by device csort time limit n

Here pt is a partitioned table (with one or multiple partitioning columns). The query will be optimized if the following conditions are satisfied:

  1. The "context by" column is filtered by "where". In this example, the device column is filtered by where device in ["a", "b""].

  2. The "csort" column (time in this example) is a partitioning column, and the partition type is VALUE or RANGE.

  3. "csort" and "context by" can only specify one column.

  4. The "context by" column is specified in the "select" clause.

map returns the following:

"map":{
  "optimize":{
      "cost":4,  // Cost of the optimization
      "field":"optimize for CONTEXT BY + CSORT + LIMIT: partial parallel execution."
  },
  "cost":1082  // Cost of the map phase
}

Based on the query statement and the table partitioning scheme, field returns either of the following:

  • optimize for CONTEXT BY + CSORT + LIMIT: partial parallel execution.

  • optimize for CONTEXT BY + CSORT + LIMIT: serial execution.

Note: If the query is optimized during execution, the execution plan may not contain the attributes merge and reduce.

merge

In the merge phase, the results of the subqueries assigned to each node are merged.

merge may contain the following sub attributes:

  • row: The total number of rows after the merge.

  • cost: The cost of the merge phase.

  • detail: Execution details of the subqueries in the merge phase. a. most: Information about the subquery returning the most rows. b. least: Information about the subquery returning the least rows.

"merge": {
  "row": 10000,
  "cost": 50,
  "detail": {
      "most": {  // the subquery returning the most rows
      "sql": "select time,id,value from pt [partition = /iot/6]",
      "explain": { ...}
      },
      "least": {
      "sql": "select time,id,value from pt [partition = /iot/9]",
      "explain": { ...}
      }
  }
}

reduce

The reduce phase combines the results of the subqueries, usually by performing one last query on the merged result. Note that there may or may not be a reduce phase in the execution plan.

reduce may contain the following sub attributes:

"reduce": {
  "sql": "...",  // the final query
  "explain": { ... }
}

Other attributes

In addition to the attributes described above at each stage of a distributed query, an execution plan may contain other SQL-related attributes.

groupBy

  • sortKey: Indicates whether the sortColumn is specified in the "group by" condition of the query. If the value is "true", there'll be no algo. Note that sortKey is only available in 2.00 or higher version.

  • algo: Indicates the grouping algorithm. It can be "hash", "vectorize" or "sort". When it is "sort", two additional fields are returned:

    • inplaceOptimization: Whether inplace optimization is conducted. Inplace optimization uses the same preset memory to store the results of all groups instead of frequent memory allocation for each temporary result, which reduces overheads and optimizes memory usage.

    • optimizedColumns: If inplaceOptimization returns true, this field returns the columns that were involved in inplace optimization. If the inplaceOptimization returns false, a null value is returned.

  • fill: Indicates the cost to implement the interpolation specified by the interval function.

"groupBy":{
  "sortKey":false,
  "algo":"hash",
  "cost":8
}

//  "group by" clause with the interval function
"groupBy": {
  "sortKey": false,
  "algo": "hash",
  "fill": {
      "cost": 23
  },
  "cost": 248
}

// When inplace optimization is applied
"groupBy": {
  "sortKey": false,
  "algo": "sort",
  "inplaceOptimization": true,
  "optimizedColumns": [
      "std_price0"
  ],
  "cost": 16422
}

contextBy

  • sortKey: Indicates whether the sortColumn is used in the "context by" clause.

"contextBy":{
  "sortKey":false,
  "cost":1994
}

Note: sortKey is only available in the 2.00 version.

join, csort, sort, pivotBy

These attributes only contain the cost sub attribute.

"join": {
"cost": 10
}

By printing the SQL execution plan and analyzing the time consumed in each part of query execution, it can help us optimize the SQL statement and improve the execution efficiency.

vectorindex

vectorindex logs information about queries processed with vector index. It is returned only when querying a DFS table with a vector index set. vectorindex may contain the following sub attributes:

  • useIndex: whether the query meets the conditions for vector index optimization. If true, the following additional info is provided.

  • cost: cost of using vector index.
  • inputRows: the number of rows to be further searched with vector index.
  • outputRows: the number of rows returned.
"vectorindex": {
	"useIndex": true,
	"cost": 457470,
	"inputRows": 200000,
	"outputRows": 3000
},