Queries
Basic SQL Clauses: SELECT, FROM, and WHERE
DolphinDB SQL statements is compatible with ANSI SQL syntax, using the standard format:
SELECT column1, column2
FROM table_name
WHERE condition
In-memory tables can be directly queried as shown in the following example:
t = table([1,2,3] as id, ["AA","BB","CC"] as sym, [100,200,300] as val)
select * from t where id = 1
select * from t where sym in ["AA","BB"]
select * from t where val between 150 and 300
Querying DFS tables requires an additional step. Unlike in-memory tables, DFS tables
must first be loaded into memory using the loadTable
function
before they can be accessed or operated on like the in-memory data objects (such as
vectors, matrices, and dictionaries).
For example, the following script loads the DFS table into memory using
loadTable
and assign it to a variable. This variable can then
be used in subsequent queries:
pt = loadTable(database="dfs://querydb", tableName="querytb")
select * from pt where date = 2024.10.10
select * from pt where date between 2024.10.10 and 2024.10.12
Table Joins
Table joins combine data from two or more related tables, enabling complex queries for efficient data retrieval. While in-memory tables support joins through both SQL statements and built-in functions, DFS tables can only be joined using SQL statements.
This section focuses on DFS table joins through examples. For a complete list of supported join types, see the Table joiners section.
When a DFS partitioned table joins with an in-memory table or a dimension table, the system will copy the in-memory or dimension table to all the nodes where the DFS table is located for joining. If the table is of significant size, it could be very time consuming to transfer the data. It is recommended to use a small right table for this case. To improve performance, the system attempts to filter the in-memory table as much as possible with the WHERE conditions before data transfer.
The following script generates a distributed database db, two partitioned tables trades and quotes, and a dimension table infos for use in later examples.
dates=2019.01.01..2019.01.31
syms="A"+string(1..30)
sym_range=cutPoints(syms,3)
db1=database("",VALUE,dates)
db2=database("",RANGE,sym_range)
db=database("dfs://stock",COMPO,[db1,db2])
n=10000
datetimes=2019.01.01T00:00:00..2019.01.31T23:59:59
t=table(take(datetimes,n) as trade_time,take(syms,n) as sym,rand(1000,n) as qty,rand(500.0,n) as price)
trades=db.createPartitionedTable(t,`trades,`trade_time`sym).append!(t)
n=200
t2=table(take(datetimes,n) as trade_time,take(syms,n) as sym,rand(500.0,n) as bid,rand(500.0,n) as offer)
quotes=db.createPartitionedTable(t2,`quotes,`trade_time`sym).append!(t2)
t3=table(syms as sym,take(0 1,30) as type)
infos=db.createTable(t3,`infos).append!(t3)
Example 1. Join two partitioned tables
Equi join trades and quotes on "trade_time" and "sym" columns:
select * from ej(trades,quotes,`trade_time`sym);
trade_time | sym | qty | price | bid | offer |
---|---|---|---|---|---|
2019.01.01T00:00:00 | A1 | 39 | 7.366735 | 37.933525 | 446.917644 |
2019.01.01T00:00:09 | A10 | 15 | 461.381014 | 405.092702 | 26.659516 |
2019.01.01T00:00:10 | A11 | 987 | 429.981704 | 404.289413 | 347.64917 |
2019.01.01T00:00:11 | A12 | 266 | 60.466206 | 420.426175 | 83.538043 |
2019.01.01T00:00:12 | A13 | 909 | 362.057769 | 324.886047 | 162.502655 |
2019.01.01T00:00:13 | A14 | 264 | 113.964472 | 497.598722 | 103.114702 |
2019.01.01T00:00:14 | A15 | 460 | 347.518325 | 24.584629 | 357.854207 |
2019.01.01T00:00:15 | A16 | 196 | 258.889177 | 49.467399 | 13.974672 |
2019.01.01T00:00:16 | A17 | 198 | 403.564922 | 428.539984 | 208.410852 |
2019.01.01T00:00:17 | A18 | 30 | 288.469046 | 41.905556 | 378.080141 |
... | ... | ... | ... | ... | ... |
Example 2. Join a partitioned table and a dimension table
Left join trades and infos on the "sym" column:
select * from lj(trades,infos,`sym);
trade_time | sym | qty | price | type |
---|---|---|---|---|
2019.01.01T00:00:00 | A1 | 856 | 359.809918 | 0 |
2019.01.01T00:00:09 | A10 | 368 | 305.801702 | 1 |
2019.01.01T00:00:10 | A11 | 549 | 447.406744 | 0 |
2019.01.01T00:00:11 | A12 | 817 | 115.613373 | 1 |
2019.01.01T00:00:12 | A13 | 321 | 298.317481 | 0 |
2019.01.01T00:00:13 | A14 | 3 | 2.289171 | 1 |
2019.01.01T00:00:14 | A15 | 586 | 91.841629 | 0 |
2019.01.01T00:00:15 | A16 | 745 | 43.256142 | 1 |
2019.01.01T00:00:16 | A17 | 60 | 0.153205 | 0 |
... | ... | ... | ... | ... |
Example 3. Join a partitioned table and an in-memory table
Equi join trades and the tmp in-memory table on the "sym" column:
tmp=table("A"+string(1..15) as sym,2019.01.11..2019.01.25 as date);
select * from ej(trades,tmp,`sym);
trade_time | sym | qty | price | date |
---|---|---|---|---|
2019.01.01T00:00:00 | A1 | 856 | 359.809918 | 2019.01.11 |
2019.01.01T00:00:09 | A10 | 368 | 305.801702 | 2019.01.20 |
2019.01.01T00:00:10 | A11 | 549 | 447.406744 | 2019.01.21 |
2019.01.01T00:00:11 | A12 | 817 | 115.613373 | 2019.01.22 |
2019.01.01T00:00:12 | A13 | 321 | 298.317481 | 2019.01.23 |
2019.01.01T00:00:13 | A14 | 3 | 2.289171 | 2019.01.24 |
2019.01.01T00:00:14 | A15 | 586 | 91.841629 | 2019.01.25 |
2019.01.01T00:00:30 | A1 | 390 | 325.407485 | 2019.01.11 |
... | ... | ... | ... | ... |
Data Grouping and Transformation Clauses
This section introduces some common clauses for data grouping and transformation in DolphinDB SQL: GROUP BY, CONTEXT BY, and PIVOT BY. For complete documentation of DolphinDB SQL clauses, please refer to SQL.
GROUP BY
Use the GROUP BY clause to group data and apply aggregate function to each group.
For example, the following script groups the data by date and calculates the total daily order quantities summed across all stocks:
orders = table(`A0001`A0001`A0002`A0002`A0002 as code,
2024.03.06 2024.03.07 2024.03.06 2024.03.07 2024.03.08 as date,
13100 15200 3700 4800 3500 as orderQty)
select sum(orderQty) as sum_orderQty from orders group by date
For details, see group by.
CONTEXT BY
CONTEXT BY is a unique feature of DolphinDB that enables time series data processing within each group, extending ANSI SQL functionality.
While both context by
and group by
conduct
grouping, they differ in the following ways:
- With GROUP BY, each group returns a single scalar. With CONTEXT BY, each group returns a vector of the same size as the group's records.
- GROUP BY can only be used with aggregate functions whereas CONTEXT BY can be used with aggregate functions, moving window functions, cumulative functions, etc.
In contrast to the GROUP BY example above where each group returns a single row, this example uses the CONTEXT BY clause to match each input row with an output row, maintaining the original number of rows. Note that while GROUP BY automatically includes the grouping column in the output, with CONTEXT BY you must explicitly specify it in the SELECT clause.
orders = table(`A0001`A0001`A0002`A0002`A0002 as code,
2024.03.06 2024.03.07 2024.03.06 2024.03.07 2024.03.08 as date,
13100 15200 3700 4800 3500 as orderQty)
select date, sum(orderQty) as sum_orderQty from orders context by date
The CONTEXT BY clause serves many purposes. It works with the moving functions to perform sliding window calculations within groups, and pairs with the LIMIT clause to retrieve the first or last n records from each group. For details, see context by.
PIVOT BY
pivot by
is a unique feature in DolphinDB and an extension to
the ANSI SQL. It rearranges one or more columns in a table along two dimensions
and can be used in conjunction with data transformation functions. When used
with a SELECT clause, it returns a table; when used with an EXEC statement, it
returns a matrix. In case of duplicate rows after pivoting, only the latest
value is retained.
PIVOT BY can transform narrow-format factor data (with timestamp, security, factor name, value columns) into panel data suitable for quantitative analysis. For example:
t = select factorValue
from loadTable("dfs://factordb","factortb")
where tradetime >= 2023.12.01
and tradetime <= 2023.12.31
and factorname = specificFactorName
pivot by tradetime, securityid, factorname
For details, see pivot by.
Distributed Query Optimization
A distributed query is a SQL query on a DFS partitioned table. A distributed query looks the same as a regular query. It would help users to write optimal distributed queries if they understand how these queries are executed.
The system first determines all relevant partitions based on WHERE clause; then it rewrites the query and sends the new query to the nodes where the relevant partitions reside; finally it merges the results of all relevant partitions.
Leverage Partition Pruning
The execution of most distributed queries does not need all partitions of a DFS table. It could save a significant amount of time if the system can just load the partitions relevant to the query.
DolphinDB performs partition pruning in the following situations:
- In a DFS table partitioned by VALUE, RANGE or LIST, if a WHERE condition:
- only includes the partitioning column (not used in a calculation or function) of the DFS table, relational operators (<, <=, =, ==, >, >=, in, between), logical operators (or, and), and constants (including operations between constants);
- uses non-chain conditions (such as 100<x<200);
- narrows down the relevant partitions. (See the following examples)
- In a DFS table partitioned by HASH, if a WHERE condition includes a partitioning column (not used in calculation or function) of the DFS table, relational operators (<, <=, =, ==, >, >=, in, between), logical operators (or, and), and constants (including operations between constants). Note that when a partitioning column is of STRING type and the between operator is used, partition pruning cannot be implemented.
Failing to meet these conditions will trigger a full partition scan, which might quickly deplete CPU, memory, and disk I/O resources.
Examples
n=10000000
id=take(1..1000, n).sort()
date=1989.12.31+take(1..365, n)
announcementDate = date+rand(5, n)
x=rand(1.0, n)
y=rand(10, n)
t=table(id, date, announcementDate, x, y)
db=database("dfs://rangedb1", RANGE, [1990.01.01, 1990.03.01, 1990.05.01, 1990.07.01, 1990.09.01, 1990.11.01, 1991.01.01])
pt = db.createPartitionedTable(t, `pt, `date)
pt.append!(t);
The where conditions specified in the following SQL queries can narrow down relevant partitions:
x = select max(x) from pt where date>1990.12.01-10;
Only 1 partition [1990.11.01, 1991.01.01) is relevant to this query.
select max(x) from pt where date between 1990.08.01:1990.12.01 group by date;
Only 3 partitions ([1990.07.01, 1990.09.01), [1990.09.01, 1990.11.01) and [1990.11.01, 1991.01.01)) are relevant to this query.
select max(x) from pt where y<5 and date between 1990.08.01:1990.08.31;
Only 1 partition [1990.07.01, 1990.09.01) is relevant to this query. Note that the filtering condition y<5 is ignored in the partition pruning. The system will further filter the data by condition y<5 after relevant partitions are loaded.
For DFS tables using the TSDB storage engine, query performance can be optimized by specifying both the partitioning columns and sort key columns (see TSDB Storage Engine) in the WHERE clause; with PKEY storage engine, specify indexed columns for optimal performance (see Primary Key Storage Engine).
Use the map
Keyword for Grouped Queries
In distributed SQL queries, when working with grouped data, calculations typically happen in two stages: first within individual partitions, then across all partitions to ensure accurate final results. This "Map-Reduce" mechanism for distributed queries will be explored in detail in the following section.
However, if the granularity of the GROUP BY columns is smaller than the
partitioning columns, then it can be sure that there will be no cross-partition
calculations. In that case, we can add the map
keyword to the
query statement to reduce the cost of calculation and optimize performance. For
detailed usage, see map.
Use Hints
A hint instructs the SQL engine on how to execute the query. Hints can be used to improve performance by forcing the optimizer to use a more efficient plan.
DolphinDB offers several HINT keywords for query optimization. For example, [HINT_EXPLAIN] displays the execution plan , making it easier to monitor and tune SQL performance; [HINT_KEEPORDER] preserves the input sequence within each group. For more information, consult the Hints documentation. For details, see Hints.
Assign Query Results
When querying data, it's best to assign the results to a variable. For example,
use data = select * from pt
instead of just select *
from pt
. The key differences are:
- The first approach loads the query results from the distributed table into memory.
- The second approach sends the results directly to the client. With large datasets, this can cause network congestion on the server side and result in slower query performance, since most of the time might be consumed by data transfer over the network.
Query Large Datasets in Batches
Each query is limited to 2 billion rows and 65,536 partitions by default. For
larger datasets, break queries into smaller batches. When queries need to access
more partitions, either add filtering conditions to reduce partition count
(recommended) or adjust the maxPartitionNumPerQuery
configuration parameter.
Map-Reduce Implementation of Distributed Queries
Scenarios for Distributed Query Rewriting
There are two situations when the system has to rewrite a distributed query. One is when we use the ORDER BY clause. The other situation is when we use aggregate functions and a grouping column which is not the partitioning column.
When the partitioning column is the first grouping column, the implementation is straightforward. The system simply executes the query on each relevant partition and then merge the individual query results.
When the partitioning column is not the first grouping column, the system uses the Map-Reduce method to implement the distributed query execution. It first searches the Map-Reduce definition (see mapr) for aggregate functions, then rewrites the original query to the map query based on the mapr definition of the aggregate functions and sends the map query to each involved partition for execution, and finally it executes the reduce query to merge the results.
select avg(x) from t where id>200 and id<900 group by date;
// date is not the partitioning column of table t
For the example above, the map query would conduct the following operation:
tempTable = select sum(x) as col1, count(x) as col2 from t where id>200 and id<900 group by date;
The reduce query would conduct the following operation:
select wavg(col1, col2) as avg_x from tempTable group by date;
Not all distributed queries can be rewritten this way. One example is the calculation of the median over a distributed table.
User-Defined (Aggregate) Functions in Distributed Queries
When users define their own aggregate functions, they must use keyword defg rather than def to tell the system it is an aggregate function. If we define an aggregate function with the keyword def, and apply the function to a distributed query, we may get wrong results or receive exceptions.
DolphinDB allows user-defined functions (UDF) or user-defined aggregate functions (UDAF) in SQL queries. Users can simply define a function and then use it in the query on the fly. There is no need for compile or deployment. The implementation in a distributed query, however, is slightly different from the implementation in an ordinary query. The system automatically checks the existence of UDF or UDAF for distributed queries. If the system detects an UDF or UDAF, it will serialize it and its dependent UDF or UDAF to the remote sites together with the query. This complicated checking and serialization process is invisible to users. It is one of the unique features DolphinDB offers in comparison with other systems.
DolphinDB does not support the use of aggregate functions such as
sum
or count
in the WHERE clause of
distributed queries. This is because distributed queries use the WHERE clause to
select the relevant partitions before executing aggregate functions with the
MapReduce method. If aggregate functions appear in the where clause, distributed
queries cannot select the relevant partitions and would fail. If we need to use
aggregate functions in the where clause of distributed queries, we can write new
distributed queries to calculate the values of these aggregate functions and
assign these values to some variables, and quote these variables in the original
distributed queries.
Variable Handling
DolphinDB SQL statement supports the use of variables that are not defined in SQL queries. For distributed SQL queries, the system automatically copies the variables on the local node to the required remote node. This is an advantage of DolphinDB compared to other systems. See SQL for more information.