Queries

Basic SQL Clauses: SELECT, FROM, and WHERE

DolphinDB SQL statements is compatible with ANSI SQL syntax, using the standard format:

SELECT column1, column2
FROM table_name
WHERE condition

In-memory tables can be directly queried as shown in the following example:

t = table([1,2,3] as id, ["AA","BB","CC"] as sym, [100,200,300] as val)
select * from t where id = 1
select * from t where sym in ["AA","BB"]
select * from t where val between 150 and 300

Querying DFS tables requires an additional step. Unlike in-memory tables, DFS tables must first be loaded into memory using the loadTable function before they can be accessed or operated on like the in-memory data objects (such as vectors, matrices, and dictionaries).

For example, the following script loads the DFS table into memory using loadTable and assign it to a variable. This variable can then be used in subsequent queries:

pt = loadTable(database="dfs://querydb", tableName="querytb")
select * from pt where date = 2024.10.10
select * from pt where date between 2024.10.10 and 2024.10.12

Table Joins

Table joins combine data from two or more related tables, enabling complex queries for efficient data retrieval. While in-memory tables support joins through both SQL statements and built-in functions, DFS tables can only be joined using SQL statements.

This section focuses on DFS table joins through examples. For a complete list of supported join types, see the Table joiners section.

When a DFS partitioned table joins with an in-memory table or a dimension table, the system will copy the in-memory or dimension table to all the nodes where the DFS table is located for joining. If the table is of significant size, it could be very time consuming to transfer the data. It is recommended to use a small right table for this case. To improve performance, the system attempts to filter the in-memory table as much as possible with the WHERE conditions before data transfer.

The following script generates a distributed database db, two partitioned tables trades and quotes, and a dimension table infos for use in later examples.

dates=2019.01.01..2019.01.31
syms="A"+string(1..30)
sym_range=cutPoints(syms,3)
db1=database("",VALUE,dates)
db2=database("",RANGE,sym_range)
db=database("dfs://stock",COMPO,[db1,db2])
n=10000
datetimes=2019.01.01T00:00:00..2019.01.31T23:59:59
t=table(take(datetimes,n) as trade_time,take(syms,n) as sym,rand(1000,n) as qty,rand(500.0,n) as price)
trades=db.createPartitionedTable(t,`trades,`trade_time`sym).append!(t)

n=200
t2=table(take(datetimes,n) as trade_time,take(syms,n) as sym,rand(500.0,n) as bid,rand(500.0,n) as offer)
quotes=db.createPartitionedTable(t2,`quotes,`trade_time`sym).append!(t2)

t3=table(syms as sym,take(0 1,30) as type)
infos=db.createTable(t3,`infos).append!(t3)

Example 1. Join two partitioned tables

Equi join trades and quotes on "trade_time" and "sym" columns:

select * from ej(trades,quotes,`trade_time`sym);
trade_time sym qty price bid offer
2019.01.01T00:00:00 A1 39 7.366735 37.933525 446.917644
2019.01.01T00:00:09 A10 15 461.381014 405.092702 26.659516
2019.01.01T00:00:10 A11 987 429.981704 404.289413 347.64917
2019.01.01T00:00:11 A12 266 60.466206 420.426175 83.538043
2019.01.01T00:00:12 A13 909 362.057769 324.886047 162.502655
2019.01.01T00:00:13 A14 264 113.964472 497.598722 103.114702
2019.01.01T00:00:14 A15 460 347.518325 24.584629 357.854207
2019.01.01T00:00:15 A16 196 258.889177 49.467399 13.974672
2019.01.01T00:00:16 A17 198 403.564922 428.539984 208.410852
2019.01.01T00:00:17 A18 30 288.469046 41.905556 378.080141
... ... ... ... ... ...

Example 2. Join a partitioned table and a dimension table

Left join trades and infos on the "sym" column:

select * from lj(trades,infos,`sym);
trade_time sym qty price type
2019.01.01T00:00:00 A1 856 359.809918 0
2019.01.01T00:00:09 A10 368 305.801702 1
2019.01.01T00:00:10 A11 549 447.406744 0
2019.01.01T00:00:11 A12 817 115.613373 1
2019.01.01T00:00:12 A13 321 298.317481 0
2019.01.01T00:00:13 A14 3 2.289171 1
2019.01.01T00:00:14 A15 586 91.841629 0
2019.01.01T00:00:15 A16 745 43.256142 1
2019.01.01T00:00:16 A17 60 0.153205 0
... ... ... ... ...

Example 3. Join a partitioned table and an in-memory table

Equi join trades and the tmp in-memory table on the "sym" column:

tmp=table("A"+string(1..15) as sym,2019.01.11..2019.01.25 as date);
select * from ej(trades,tmp,`sym);
trade_time sym qty price date
2019.01.01T00:00:00 A1 856 359.809918 2019.01.11
2019.01.01T00:00:09 A10 368 305.801702 2019.01.20
2019.01.01T00:00:10 A11 549 447.406744 2019.01.21
2019.01.01T00:00:11 A12 817 115.613373 2019.01.22
2019.01.01T00:00:12 A13 321 298.317481 2019.01.23
2019.01.01T00:00:13 A14 3 2.289171 2019.01.24
2019.01.01T00:00:14 A15 586 91.841629 2019.01.25
2019.01.01T00:00:30 A1 390 325.407485 2019.01.11
... ... ... ... ...

Data Grouping Clauses

This section introduces some common clauses for data grouping and transformation in DolphinDB SQL: GROUP BY, CONTEXT BY, and PIVOT BY. For complete documentation of DolphinDB SQL clauses, please refer to SQL.

GROUP BY

Use the GROUP BY clause to group data and apply aggregate function to each group.

For example, the following script groups the data by date and calculates the total daily order quantities summed across all stocks:

orders = table(`A0001`A0001`A0002`A0002`A0002 as code,
2024.03.06 2024.03.07 2024.03.06 2024.03.07 2024.03.08 as date,
13100 15200 3700 4800 3500 as orderQty)

select sum(orderQty) as sum_orderQty from orders group by date

For details, see group by.

CONTEXT BY

CONTEXT BY is a unique feature of DolphinDB that enables time series data processing within each group, extending ANSI SQL functionality.

While both context by and group by conduct grouping, they differ in the following ways:

  • With GROUP BY, each group returns a single scalar. With CONTEXT BY, each group returns a vector of the same size as the group's records.
  • GROUP BY can only be used with aggregate functions whereas CONTEXT BY can be used with aggregate functions, moving window functions, cumulative functions, etc.

In contrast to the GROUP BY example above where each group returns a single row, this example uses the CONTEXT BY clause to match each input row with an output row, maintaining the original number of rows. Note that while GROUP BY automatically includes the grouping column in the output, with CONTEXT BY you must explicitly specify it in the SELECT clause.

orders = table(`A0001`A0001`A0002`A0002`A0002 as code,
2024.03.06 2024.03.07 2024.03.06 2024.03.07 2024.03.08 as date,
13100 15200 3700 4800 3500 as orderQty)
select date, sum(orderQty) as sum_orderQty from orders context by date

The CONTEXT BY clause serves many purposes. It works with the moving functions to perform sliding window calculations within groups, and pairs with the LIMIT clause to retrieve the first or last n records from each group. For details, see context by.

PIVOT BY

pivot by is a unique feature in DolphinDB and an extension to the ANSI SQL. It rearranges one or more columns in a table along two dimensions and can be used in conjunction with data transformation functions. When used with a SELECT clause, it returns a table; when used with an EXEC statement, it returns a matrix. In case of duplicate rows after pivoting, only the latest value is retained.

PIVOT BY can transform narrow-format factor data (with timestamp, security, factor name, value columns) into panel data suitable for quantitative analysis. For example:

t = select factorValue 
from loadTable("dfs://factordb","factortb") 
where tradetime >= 2023.12.01 
  and tradetime <= 2023.12.31 
  and factorname = specificFactorName 
pivot by tradetime, securityid, factorname

For details, see pivot by.

Distributed Query Optimization

A distributed query is a SQL query on a DFS partitioned table. A distributed query looks the same as a regular query. It would help users to write optimal distributed queries if they understand how these queries are executed.

The system first determines all relevant partitions based on WHERE clause; then it rewrites the query and sends the new query to the nodes where the relevant partitions reside; finally it merges the results of all relevant partitions.

Leverage Partition Pruning

The execution of most distributed queries does not need all partitions of a DFS table. It could save a significant amount of time if the system can just load the partitions relevant to the query.

DolphinDB performs partition pruning in the following situations:

  • In a DFS table partitioned by VALUE, RANGE or LIST, if a WHERE condition:
    • only includes the partitioning column (not used in a calculation or function) of the DFS table, relational operators (<, <=, =, ==, >, >=, in, between), logical operators (or, and), and constants (including operations between constants);
    • uses non-chain conditions (such as 100<x<200);
    • narrows down the relevant partitions. (See the following examples)
  • In a DFS table partitioned by HASH, if a WHERE condition includes a partitioning column (not used in calculation or function) of the DFS table, relational operators (<, <=, =, ==, >, >=, in, between), logical operators (or, and), and constants (including operations between constants). Note that when a partitioning column is of STRING type and the between operator is used, partition pruning cannot be implemented.

Failing to meet these conditions will trigger a full partition scan, which might quickly deplete CPU, memory, and disk I/O resources.

Examples

Generate sample data:

n=10000000
id=take(1..1000, n).sort()
date=1989.12.31+take(1..365, n)
announcementDate = date+rand(5, n)
x=rand(1.0, n)
y=rand(10, n)
t=table(id, date, announcementDate, x, y)
db=database("dfs://rangedb1", RANGE, [1990.01.01, 1990.03.01, 1990.05.01, 1990.07.01, 1990.09.01, 1990.11.01, 1991.01.01])
pt = db.createPartitionedTable(t, `pt, `date)
pt.append!(t);

The where conditions specified in the following SQL queries can narrow down relevant partitions:

x = select max(x) from pt where date>1990.12.01-10;

Only 1 partition [1990.11.01, 1991.01.01) is relevant to this query.

select max(x) from pt where date between 1990.08.01:1990.12.01 group by date;

Only 3 partitions ([1990.07.01, 1990.09.01), [1990.09.01, 1990.11.01) and [1990.11.01, 1991.01.01)) are relevant to this query.

select max(x) from pt where y<5 and date between 1990.08.01:1990.08.31;

Only 1 partition [1990.07.01, 1990.09.01) is relevant to this query. Note that the filtering condition y<5 is ignored in the partition pruning. The system will further filter the data by condition y<5 after relevant partitions are loaded.

Partitions can be narrowed down when applying lower-precision functions to partitioning columns of temporal types. The temporal types are sorted in descending order of precision:

  • NANOTIMESTAMP > TIMESTAMP > DATETIME> DATEHOUR> DATE> MONTH> YEAR
  • TIME> SECOND > MINUTE> HOUR

The partitions in the above example are of DATE type, so partition pruning can be achieved if function month is used:

select max(x) from pt where month(date)>=2019.12M;

Only 1 partition [1990.11.01, 1991.01.01) is relevant to this query.

The following queries cannot narrow down the relevant partitions. If used on a huge partitioned table, they will take a long time to finish. For this reason these queries should be avoided.

select max(x) from pt where date+30>2019.12.01;
//the partitioning column is involved in a calculation: cannot narrow down relevant partitions

select max(x) from pt where 2019.12.01<date<2019.12.31;
//chained comparision: cannot narrow down relevant partitions

select max(x) from pt where y<5;
// no partitioning column is used: cannot narrow down relevant partitions

select max(x) from pt where date<announcementDate-3;
// the filtering logic cannot narrow down relevant partitions

select max(x) from pt where y<5 or date between 1990.08.01:1990.08.31;
// the filtering column is compared to another column: cannot narrow down relevant partitions

For DFS tables using the TSDB storage engine, query performance can be optimized by specifying both the partitioning columns and sort key columns (see TSDB Storage Engine) in the WHERE clause; with PKEY storage engine, specify indexed columns for optimal performance (see Primary Key Storage Engine).

Use the map Keyword for Grouped Queries

In distributed SQL queries, when working with grouped data, calculations typically happen in two stages: first within individual partitions, then across all partitions to ensure accurate final results. This "Map-Reduce" mechanism for distributed queries will be explored in detail in the following section.

However, if the granularity of the GROUP BY columns is smaller than the partitioning columns, then it can be sure that there will be no cross-partition calculations. In that case, we can add the map keyword to the query statement to reduce the cost of calculation and optimize performance. For detailed usage, see map.

Use Hints

A hint instructs the SQL engine on how to execute the query. Hints can be used to improve performance by forcing the optimizer to use a more efficient plan.

DolphinDB offers several HINT keywords for query optimization. For example, [HINT_EXPLAIN] displays the execution plan , making it easier to monitor and tune SQL performance; [HINT_KEEPORDER] preserves the input sequence within each group. For more information, consult the Hints documentation. For details, see Hints.

Assign Query Results

When querying data, it's best to assign the results to a variable. For example, use data = select * from pt instead of just select * from pt. The key differences are:

  • The first approach loads the query results from the distributed table into memory.
  • The second approach sends the results directly to the client. With large datasets, this can cause network congestion on the server side and result in slower query performance, since most of the time might be consumed by data transfer over the network.

Query Large Datasets in Batches

Each query is limited to 2 billion rows and 65,536 partitions by default. For larger datasets, break queries into smaller batches. When queries need to access more partitions, either add filtering conditions to reduce partition count (recommended) or adjust the maxPartitionNumPerQuery configuration parameter.

For example, the following query exceeds the query limit:

t = select * from pt where date between 2010.01.01 and 2024.12.31

Break the query into smaller batches:

t1 = select * from pt where date between 2010.01.01 and 2017.12.31
t2 = select * from pt where date between 2018.01.01 and 2024.12.31

Map-Reduce Implementation of Distributed Queries

Scenarios for Distributed Query Rewriting

There are two situations when the system has to rewrite a distributed query. One is when we use the ORDER BY clause. The other situation is when we use aggregate functions and a grouping column which is not the partitioning column.

When the partitioning column is the first grouping column, the implementation is straightforward. The system simply executes the query on each relevant partition and then merge the individual query results.

When the partitioning column is not the first grouping column, the system uses the Map-Reduce method to implement the distributed query execution. It first searches the Map-Reduce definition (see mapr) for aggregate functions, then rewrites the original query to the map query based on the mapr definition of the aggregate functions and sends the map query to each involved partition for execution, and finally it executes the reduce query to merge the results.

select avg(x) from t where id>200 and id<900 group by date;
// date is not the partitioning column of table t 

For the example above, the map query would conduct the following operation:

tempTable = select sum(x) as col1, count(x) as col2 from t where id>200 and id<900 group by date;

The reduce query would conduct the following operation:

select wavg(col1, col2) as avg_x from tempTable group by date;

Not all distributed queries can be rewritten this way. One example is the calculation of the median over a distributed table.

User-Defined (Aggregate) Functions in Distributed Queries

When users define their own aggregate functions, they must use keyword defg rather than def to tell the system it is an aggregate function. If we define an aggregate function with the keyword def, and apply the function to a distributed query, we may get wrong results or receive exceptions.

DolphinDB allows user-defined functions (UDF) or user-defined aggregate functions (UDAF) in SQL queries. Users can simply define a function and then use it in the query on the fly. There is no need for compile or deployment. The implementation in a distributed query, however, is slightly different from the implementation in an ordinary query. The system automatically checks the existence of UDF or UDAF for distributed queries. If the system detects an UDF or UDAF, it will serialize it and its dependent UDF or UDAF to the remote sites together with the query. This complicated checking and serialization process is invisible to users. It is one of the unique features DolphinDB offers in comparison with other systems.

DolphinDB does not support the use of aggregate functions such as sum or count in the WHERE clause of distributed queries. This is because distributed queries use the WHERE clause to select the relevant partitions before executing aggregate functions with the MapReduce method. If aggregate functions appear in the where clause, distributed queries cannot select the relevant partitions and would fail. If we need to use aggregate functions in the where clause of distributed queries, we can write new distributed queries to calculate the values of these aggregate functions and assign these values to some variables, and quote these variables in the original distributed queries.

Variable Handling

DolphinDB SQL statement supports the use of variables that are not defined in SQL queries. For distributed SQL queries, the system automatically copies the variables on the local node to the required remote node. This is an advantage of DolphinDB compared to other systems. See SQL for more information.