DolphinDB SQL Tuning Case Study
This tutorial introduces the best practices for writing SQL statements with optimal performance in DolphinDB. We will explain the techniques for writing SQL scripts by comparing the performance before and after query tuning.
Environment Setup
Prerequisites:
-
CPU: Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz
-
Core count: 64
-
Memory: 512 GB
-
Operating System: CentOS Linux release 7.9
-
License: Community Edition, 2 CPU cores, 8 GB Memory
-
DolphinDB Server Version: DolphinDB_Linux64_V2.00.6, standalone mode
-
DolphinDB GUI Version: DolphinDB_GUI_V 1.30.15
Cases in this tutorial use simulated NYSE TAQ (Trade and Quote) data of June 2020. The following script simulates the data of 8,000 stock quotes based on the table schema of Daily TAQ, creates OLAP and TSDB databases and tables, and appends data to these tables:
model = table(1:0, `Symbol`DateTime`BidPx`OfrPx`BidSize`OfrSize`Mode`Ex`Mmid, [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, INT, INT, INT, SYMBOL, SYMBOL])
// Create an OLAP database and table
dbDate = database("", VALUE, 2020.06.01..2020.06.07)
dbSymbol = database("", HASH, [SYMBOL, 64])
db = database("dfs://TAQ", COMPO, [dbDate, dbSymbol])
createPartitionedTable(db, model, `Quotes, `DateTime`Symbol)
// Create a TSDB database and table
dbDate = database("", VALUE, 2020.06.01..2020.06.07)
dbSymbol = database("", HASH, [SYMBOL, 32])
db = database("dfs://TAQ_TSDB", COMPO, [dbDate, dbSymbol], engine="TSDB")
createPartitionedTable(db, model, `Quotes, `DateTime`Symbol, sortColumns=`Symbol`DateTime)
// Simulate data of half an hour to avoid OOM
def mockOneHourData(Date, StartTime) {
v_Symbol = format(1..8000, "0000")
v_DateTime = concatDateTime(Date, time(StartTime) + 0..(30 * 60 * 1000 - 1))
v_Mode = [3, 4, 5, 7, 8, 9, 10, 11, 13, 15, 16, 17, 23, 29]
v_Ex = [`A, `B, `C, `D, `M, `N, `P, `T, `W, `X]
v_Mmid = [`FLOW, `EDGX, `EDGA, `NASD, string(NULL)]
n = 10000000
return table(take(v_Symbol, n) as Symbol, rand(v_DateTime, n) as DateTime, rand(100.0, n) as BidPx, rand(100.0, n) as OfrPx, rand(1000, n) as BidSiz, rand(100, n) as OfrSize, rand(v_Mode, n) as Mode, rand(v_Ex, n) as Ex, rand(v_Mmid, n) as Mmid)
}
def mockData(DateVector, StartTimeVector) {
for(Date in DateVector) {
for(StartTime in StartTimeVector) {
data = mockOneHourData(Date, StartTime)
// Append data to the OLAP database
loadTable("dfs://TAQ", "Quotes").append!(data)
// Append data to the TSDB database
loadTable("dfs://TAQ_TSDB", "Quotes").append!(data)
}
}
}
mockData(2020.06.01..2020.06.02, 09:30:00 + 0..13 * 1800)
Conditional Filtering
You can use one or more conditional expressions in the where clause of a SQL statement to filter out the records that satisfy the specified conditions.
DolphinDB built-in functions and user-defined functions can be used in the
conditional expressions. However, aggregate and order-sensitive functions (such as
sum
, count
, next
,
deltas
, ratios
) cannot be used in the
where
clause when querying DFS tables. This is because the
distributed queries need to first prune partitions based on the
where
conditions, which would fail to narrow down the partition
range if those functions were used.
Filter with Keyword in
Scenario: Table t1 stores stock quotes and table t2 contains information about the stock industries. Filter t1 based on the industry information.
Load table Quotes as t1 from database dfs://TAQ and create a table t2 with stock industries.
For example:
t1 = loadTable("dfs://TAQ", "Quotes")
Symbols = exec distinct Symbol from t1 where date(DateTime) = 2020.06.01
t2 = table(Symbols as Symbol,
take(`Mul`IoT`Eco`Csm`Edu`Food, Symbols.size()) as Industry)
Inefficient:
Left join t1 and t2 with the join column Symbol and filter the data with
where
conditions:
timer res1 = select Symbol, DateTime
from lj(t1, t2, `Symbol)
where date(DateTime) = 2020.06.01, Industry=`Edu
Time elapsed: 7,296 ms
Note: The timer
statement returns the execution time of a
script on DolphinDB server, which does not include the time to return the result
to the client. If the result set is too large, the time spent on
serialization/deserialization and network transfer may far exceed the execution
time on the server.
Efficient:
Obtain a vector of symbols in the "Edu" industry from t2 and specify the range of
conditions using keyword in
:
timer{
Symbols = exec Symbol from t2 where Industry="Edu"
res2 = select Symbol, DateTime
from t1
where date(DateTime) = 2020.06.01, Symbol in Symbols
}
Time elapsed: 2,866 ms
each(eqObj, res1.values(), res2.values()) // true
We apply function each
to each column of the two output tables
to compare the results. Returning true means that the results of two queries are
consistent. The second query improves the query performance by about 3 times
because the join operation consumes much more time than filtering operation with
a where
clause. Therefore, using dictionaries or the keyword
in
is more recommended than table join in a SQL
statement.
Filter Data within Groups
Scenario: Query daily trade data and obtain the records with the top 25% highest trading volumes for each stock.
Load table Quotes as quotes from database dfs://TAQ for later reference:
quotes = loadTable("dfs://TAQ", "Quotes")
Use the context by
clause for data grouping and calculate a
linear interpolation of the 75% quantile as the minimum value on the OfrSize
column:
timer result = select * from quotes
where date(DateTime) = 2020.06.01
context by Symbol having OfrSize >= percentile(OfrSize, 75, "linear")
The context by
is a DolphinDB SQL keyword for grouped
computation. Both context by
and group by
conduct grouping. However, with group by
, each group returns a
scalar value; with context by
, each group returns a vector of
the same size as the group's records. The group by
clause can
only be used with aggregate functions, whereas the context by
clause can be used with aggregate functions, moving window functions, cumulative
functions, etc.
The having
clause is often used after group by
or context by
clause to select the records satisfying the
specific conditions.
Scenario: After selecting records with the top 25% volumes for each stock, calculate the standardized deviation of LastPx.
Inefficient:
After calculating the linear interpolation of the 75% quantile, use the
group by
clause to group the stocks and calculate the
standardized deviation. The results are sorted with the order
by
clause.
timer select std(OfrPx) as std from (
select Symbol, OfrPx from quotes
where date(DateTime) = 2020.06.01
context by Symbol
having OfrSize >= percentile(OfrSize, 75, "linear"))
group by Symbol
order by Symbol
Time elapsed: 4,738 ms
Efficient:
Group the stocks by the symbols with group by
. Select the
records with the top 25% volumes with higher-order function
aggrTopN
and calculate the standardized deviation.
timer select aggrTopN(std, OfrPx, OfrSize, 0.25, false) as std from quotes
where date(DateTime) = 2020.06.01
group by Symbol
order by Symbol
Time elapsed: 2,600 ms
The original script groups data and filters them with the given conditions before aggregation. The optimized query implements filter and aggregation directly after data grouping, demonstrating better performance.
Separate Conditions in Where Clause: Commas or the AND Operator
When multiple conditions are present in the where clause, you can control how they are applied using either commas or the AND operator:
-
If the conditions are separated by commas, the data is filtered sequentially by each condition. The first condition is applied to the original data, narrowing it down. Subsequently, each following condition is applied to the output of the preceding one, progressively narrowing down the data.
-
In contrast, if the conditions are separated by "and", each condition filters the original data separately. The results of all the conditions are then intersected to return only the data that meets every condition.
The following examples demonstrate the differences.
First, simulate example data:
N = 10000000
t = table(take(2019.01.01..2019.01.03, N) as date,
take(`C`MS`MS`MS`IBM`IBM`IBM`C`C$SYMBOL, N) as sym,
take(49.6 29.46 29.52 30.02 174.97 175.23 50.76 50.32 51.29, N) as price,
take(2200 1900 2100 3200 6800 5400 1300 2500 8800, N) as qty)
Based on whether order-sensitive functions (e.g., deltas, ratios, ffill, move, prev, cumsum) are used, there are two scenarios:
Order-Insensitive Conditions
Sample code:
timer(10) t1 = select * from t where qty > 2000, date = 2019.01.02, sym = `C
timer(10) t2 = select * from t where qty > 2000 and date = 2019.01.02 and sym = `C
each(eqObj, t1.values(), t2.values()) // true
The two queries cost 902 ms and 930 ms, respectively. Using a comma or "and" makes little difference in performance.
Now we change the order of the conditions in these two queries and see how it affects the query performance and result:
timer(10) t3 = select * from t where date = 2019.01.02, sym = `C, qty > 2000
timer(10) t4 = select * from t where date = 2019.01.02 and sym = `C and qty > 2000
each(eqObj, t1.values(), t3.values()) // true
each(eqObj, t2.values(), t4.values()) // true
The two queries cost 669 ms and 651 ms, respectively. Using a comma or "and" makes little difference in performance.
The order of conditions does not affect the result when no order-sensitive functions are used. However, t3 and t4 are faster than t1 and t2 by 30%. This suggests that optimizing the query's performance can be achieved by first filtering on columns that more efficiently narrow down the data.
This suggests that optimizing the query's performance is achieved by first filtering on columns that more efficiently narrow down the data.
Order-Sensitive Conditions
Sample code:
timer(10) t1 = select * from t where ratios(qty) > 1, date = 2019.01.02, sym = `C
timer(10) t2 = select * from t where ratios(qty) > 1 and date = 2019.01.02 and sym = `C
each(eqObj, t1.values(), t2.values()) // true
The two queries cost 1,503 ms and 1,465 ms, respectively.
Using a comma or "and" makes little difference in performance. The only
order-sensitive condition ratio()
is placed as the first
condition in the where
clause, so it is evaluated first in
query t1 (which uses comma). Therefore, the query result of t1 is the same
as that of t2.
Now we change the order of the conditions in these two queries and see how it affects the query performance and result:
timer(10) t3 = select * from t where date = 2019.01.02, sym = `C, ratios(qty) > 1
timer(10) t4 = select * from t where date = 2019.01.02 and sym = `C and ratios(qty) > 1
each(eqObj, t2.values(), t4.values()) // true
each(eqObj, t1.values(), t3.values()) // false
The two queries cost 507 ms and 1,433 ms, respectively. The results of the queries t2 and t4 are the same, whereas the results of t1 and t3 are different.
This demonstrates that when a conditions is order-sensitive, it makes no difference to the result and performance of queries using "and" to separate conditions; but for queries using commas, putting the order-sensitive condition in a later position will improve the performance but also change the final result.
Conclusion:
-
When query conditions are not order-sensitive, using comma or "and" gives similar performance. The system converts "and" to comma internally and evaluates conditions left-to-right. It is recommended to put more selective filters earlier to optimize performance.
-
For order-sensitive conditions, always use "and" to avoid errors. "and" evaluates conditions independently and intersects the results, so the order does not affect performance or result.
Cases on DFS Tables
The syntax of distributed queries are the same as the standard queries. Nevertheless, knowledge of how distributed queries are processed could help you make queries more efficient.
DolphinDB doesn't have row indices for data storage and retrieval, but uses partitions as the physical index of a database. For a distributed query, DolphinDB identifies the partitions involved based on the "where" condition, then breaks it down into subqueries and send them to the nodes holding these partitions (the "map" phase). Intermediate results on the nodes are then sent back to the initial node ("merge") and further processed to generate the final result ("reduce").
Partition Pruning
If a filter condition in the where clause satisfies the following:
-
Contains only the partitioning column(s) (without embedding any functions) of the DFS table
-
Contains only relational operators (<, <=, =, ==, >, >=, in, between), logical operators (or, and), and constants (including operations between constants)
-
Not a chained condition (e.g. 100<x<200)
-
The filtering logic narrows down the query scope
then the system only loads the relevant partitions for the query instead of scanning all data, which greatly reduces the query time. This process is called partition pruning.
Scenario: Query the number of records of each stock during a certain time period.
First, load the "Quotes" table in the "TAQ" database and assign it to the "quotes" variable:
quotes = loadTable("dfs://TAQ", "Quotes")
Inefficient:
In the comparison condition, use temporalFormat to convert the temporal variable "DateTime" to strings:
timer t1 = select count(*) from quotes
where temporalFormat(DateTime, "yyyy.MM.dd") >= "2020.06.01" and temporalFormat(DateTime, "yyyy.MM.dd") <= "2020.06.02"
group by Symbol
Time elapsed: 60,514 ms
Efficient:
Use date to convert the temporal variable "DateTime" to DATE type.
timer t2 = select count(*) from quotes
where date(DateTime) between 2020.06.01 : 2020.06.02
group by Symbol
Time elapsed: 1,633 ms
each(eqObj, t1.values(), t2.values()) // true
The performance of the second query is tens of times faster than the first query.
In the first query, the values in the partitioning column "DateTime" are converted into strings, making it impossible for partition pruning.
More examples of where conditions that cannot narrow down the relevant partitions:
-
Operations on a partitioning column
select count(*) from quotes where date(DateTime) + 1 > 2020.06.01
-
Using chained comparison operators
select count(*) from quotes where 2020.06.01 < date(DateTime) < 2020.06.03
-
Using non-partitioning columns
select count(*) from quotes where OfrSize < 500
-
Comparing a partitioning column with another column. (Note: "AnnouncementDate" is not an actual column in the "Quotes" table created at the beginning of this tutorial. It is used here for illustrative purpose only.)
select count(*) from quotes where date(DateTime) < AnnouncementDate - 3
Parallel Queries with the group by Clause
In a distributed query, if the "group by" clause contains a partitioning column, the query will be broken down into subqueries and conducted concurrently in each partition.
Scenario: For each stock over a time period, set flags by comparing the bid/offer prices, and calculate the difference between the best bid and offer prices, the total trading volume, and other key metrics.
First, load the "Quotes" table:
quotes = loadTable("dfs://TAQ", "Quotes")
Inefficient:
From data later than 14:30:00, June 1, 2020, flag the records with offer prices higher than bid prices as "1", flag the rest of the records as "0", and save the results to the in-memory table "tmp_t". From "tmp_t", group the records by "Symbol", "DateTime" and "Flag", and calculate the number of OfrPx records and the sum of OfrSize in each group.
timer {
tmp_t = select *, iif(OfrPx > BidPx, 1, 0) as Flag
from quotes
where date(DateTime) = 2020.06.01, second(DateTime) >= 14:30:00
t1 = select iif(max(OfrPx) - min(OfrPx) == 0, 0, 1) as OfrPxDiff, count(OfrPx) as OfrPxCount, sum(OfrSize) as OfrSizeSum
from tmp_t
group by Symbol, date(DateTime) as Date, Flag
}
Time elapsed: 23,859 ms
Efficient:
Query the DFS table "quotes" directly without introducing the in-memory table:
timer t2 = select iif(max(OfrPx) - min(BidPx) == 0, 0, 1) as OfrPxDiff, count(OfrPx) as OfrPxCount, sum(OfrSize) as OfrSizeSum
from quotes
where date(DateTime) = 2020.06.01, second(DateTime) >= 14:30:00
group by Symbol, date(DateTime) as Date, iif(OfrPx > BidPx, 1, 0) as Flag
each(eqObj, t1.values(), (select * from t2 order by Symbol, Date, Flag).values()) // true
The performance of the second query is 3 times faster. The improvement lies in:
(1) The first query combines data from different partitions into an in-memory table, then divides the data into groups for calculation. Operations on the in-memory table can only be processed by a single CPU core.
(2) The second query performs group calculation on the DFS table directly and the grouping columns contain a partitioning column, which allows for parallel computing with multiple CPU cores.
As a general rule, for distributed queries and computing, avoid generating intermediate results whenever possible, and operate directly on the original DFS table for optimal performance.
Use the "map" Keyword in Grouped Queries
The "map" keyword makes the query to be executed separately within each partition. The output is the results from each partition-level query execution.
Scenario: Query the number of records per minute for each stock.
Load the "Quotes" table:
quotes = loadTable("dfs://TAQ", "Quotes")
Inefficient:
timer result = select count(*) from quotes group by Symbol, bar(DateTime, 60s)
Time elapsed: 16,058 ms
Efficient:
Add the "map" keyword to the query:
timer result = select count(*) from quotes group by Symbol, bar(DateTime, 60s) map
Time elapsed: 14,520 ms
The performance of the second query is faster by 10% - 20%.
The first query is executed in two steps:
-
Calculate within each partition and generate the intermediate results
-
Collect the intermediate results to generate a final result
If the granularity of the "group by" columns is smaller than the partitioning columns, then we can be sure that there will be no cross-partition calculations. In this example, the "Quotes" DFS table is partitioned by dates, which is more coarse-grained than the 60-second grouping granularity in the query. By adding the "map" keyword to the query, DolphinDB can skip the reduce step and directly generate the grouped output from the map step. This avoids unnecessary computation and improves query performance.
Grouped Calculation
Query the Latest N Records
Scenario: Retrieve the latest 10 records of each stock
Group the data of June 01, 2020 by stock ID to retrieve the latest 10 records of
each group. Use the context by
clause with keywords
csort
and top
to obtain the latest records
for each group.
For example, query 140,000,000 records:
OLAP storage engine:
timer t1 = select * from loadTable("dfs://TAQ", "Quotes")
where date(DateTime) = 2020.06.01
context by Symbol csort DateTime limit -10
Time elapsed: 20,510 ms
TSDB storage engine:
timer t2 = select * from loadTable("dfs://Level1_TSDB", "Snapshot")
where date(DateTime) = 2020.06.01
context by SecurityID csort DateTime limit -10
Time elapsed: 2,454 ms
each(eqObj, t1.values(), t2.values()) //true
DolphinDB version 2.0 or higher provides the TSDB storage engine which uses sort columns to maintain in-partition index. It shows better performance when dealing with time-series point queries than the OLAP engine. In this case, the query performance of TSDB is 8 times faster than OLAP.
Unlike traditional databases, DolphinDB adopts columnar storage to better process
time-series data. In addition, the context by
clause makes it
more convenient to process time series within each group.
Calculate Moving VWAP (Volume Weighted Average Price)
Scenario: An in-memory table contains 3,000 stocks, with each stock having
10,000 records. We compare the performance of calculating the VWAP using
mwavg
with loops versus context
by
.
Data simulation:
syms = format(1..3000, "SH000000")
N = 10000
t = cj(table(syms as symbol), table(rand(100.0, N) as price, rand(10000, N) as volume))
Inefficient:
Loop for 3,000 times where each loop obtains the price and volume records of a
stock to calculate mwavg
. Then combine the results.
arr = array(ANY, syms.size())
timer {
for(i in 0 : syms.size()) {
price_vec = exec price from t where symbol = syms[i]
volume_vec = exec volume from t where symbol = syms[i]
arr[i] = mwavg(price_vec, volume_vec, 4)
}
res1 = reduce(join, arr)
}
Time elapsed: 25 min
Efficient:
Use the context by
clause to group data by stock and then
calculate mwavg
within each group.
timer res2 = select mwavg(price, volume, 4) from t
context by symbol
Time elapsed: 3,176 ms
each(eqObj, res1, res2[`mwavg_price]) // true
The performance of optimized calculation is over 400 times faster. It is because
that the context by
clause only accesses the data once and
calculates in groups, while each loop scans the table to obtain the records of a
stock and the script takes more time.
Calculate Cumulative VWAP
Scenario: Calculate the VWAP for each stock every minute since the market opening.
First load the test data:
quotes = loadTable("dfs://TAQ", "Quotes")
Group the data by Symbol with group by
, then aggregate data
within cgroup-by groups. Sort the results with order by
clause
for each stock.
timer result = select wavg(OfrPx, OfrSize) as vwap from quotes
where date(DateTime) = 2020.06.01
group by Symbol
cgroup by minute(DateTime) as Minute
order by Symbol, Minute
Time elapsed: 10,798 ms
The cgroup by
(cumulative group) clause performs cumulative
grouping calculations. See cgroup by.
Calculate VWAP for N Shares of Stocks
Scenario: Calculate the VWAP for all trades related to the latest 1000 traded shares of each stock.
When filtering for the 1000 shares, the following situations may arise: The sum of shares for the latest three trades with quantities of 100, 300, and 600 equals 1000; Or the sum of shares for two trades with quantities of 900 and 300 exceeds 1000. It is necessary to first identify the trades involved in the calculation, ensuring that the sum of shares slightly surpasses 1,000, and after excluding the most recent trade, the sum of shares becomes less than 1,000. Subsequently, calculate the VWAP for these identified trades.
Data simulation:
n = 500000
t = table(rand(string(1..4000), n) as sym, rand(10.0, n) as price, rand(500, n) as vol)
Inefficient:
Use group by
to group the data by stocks and call user-defined
aggregate function lastVolPx1
for each stock. Loop over all
trades to check whether the sum of shares is exactly equal to or greater than
1,000. Then calculate the vwag.
defg lastVolPx1(price, vol, bound) {
size = price.size()
cumSum = 0
for(i in 0:size) {
cumSum = cumSum + vol[size - 1 - i]
if(cumSum >= bound) {
price_tmp = price.subarray(size - 1 - i :)
vol_tmp = vol.subarray(size - 1 - i :)
return wavg(price_tmp, vol_tmp)
}
if(i == size - 1 && cumSum < bound) {
return wavg(price, vol)
}
}
}
timer lastVolPx_t1 = select lastVolPx1(price, vol, 1000) as lastVolPx from t group by sym
Time Elapsed: 187 ms
Efficient:
Use group by
to group the data by stocks and call user-defined
aggregate function lastVolPx2
for each stock. Calculate the
cumulative sum of trade shares and locate the point after which the volume of
shares exceeds 1,000. Then calculate vwag.
defg lastVolPx2(price, vol, bound) {
cumVol = vol.cumsum()
if(cumVol.tail() <= bound)
return wavg(price, vol)
else {
start = (cumVol <= cumVol.tail() - bound).sum()
return wavg(price.subarray(start:), vol.subarray(start:))
}
}
timer lastVolPx_t2 = select lastVolPx2(price, vol, 1000) as lastVolPx from t group by sym
Time Elapsed: 73 ms
each(eqObj, lastVolPx_t1.values(), lastVolPx_t2.values()) // true
Compared with the original script, the function lastVolPx2
has
adopted vectorization, which double improves the performance. Using
vectorization instead of loops is a recommended query tuning trick in
DolphinDB.
Calculate ROC (Price Rate of Change) by Segment
Scenario: Calculate the price change rate of each stock segmented by a column of the market snapshot data.
The following example is based on the data of June 1st, 2020.
First use the group by
clause to group the data by Symbol and
segmented OfrPx. Then calculate the price change rate of the first level price
for each stock.
timer t = select last(OfrPx) \ first(OfrPx) - 1 from
loadTable("dfs://TAQ", "Quotes")
where date(DateTime) = 2020.06.01
group by Symbol, segment(OfrPx, false)
Time Elapsed: 11,075 ms
The segment
function is used to divide a vector into groups, and
each group is composed of identical values. It returns a vector of the same
length as the input.
Calculate Extreme Values with Different Intervals
Scenario: Obtain a window of consecutive intervals no smaller than the target value. Then take the first record with the maximum value of a specified field within each window.
Data simulation:
t = table(2021.09.29 + 0..15 as date,
0 0 0.3 0.3 0 0.5 0.3 0.5 0 0 0.3 0 0.4 0.6 0.6 0 as value)
targetVal = 0.3
Inefficient:
Define a function generateGrp
: If the current value is no
smaller than the target value, it takes the current group ID; If the next value
is smaller than the target value, the group ID is added by 1 to ensure that
different consecutive values are assigned to different groups.
def generateGrp(targetVal, val) {
arr = array(INT, val.size())
n = 1
for(i in 0 : val.size()) {
if(val[i] >= targetVal) {
arr[i] = n
if(val[i + 1] < targetVal) n = n + 1
}
}
return arr
}
Use the context by
clause to group the data by group ID, the
having
clause to select the maximum value, and keyword
limit
to return the first record:
timer(1000) {
tmp = select date, value, generateGrp(targetVal, value) as grp from t
res1 = select date, value from tmp where grp != 0
context by grp
having value = max(value) limit 1
}
Time Elapsed: 142 ms
Efficient:
Use the segment
function with context by
clause
to group the consecutive values that are greater than or equal to the target
value, and use the having
clause for data filtering.
timer(1000) res2 = select * from t
context by segment(value >= targetVal)
having value >= targetVal and value = max(value) limit 1
Time Elapsed: 123 ms
each(eqObj, res1.values(), res2.values()) // true
Compared with the original script, the performance of the second SQL query is optimized by 10%.
The segment
function is usually used for order-sensitive data
grouping with improved performance and simpler style compared with loops.
Calculate Metrics Using Conditional Aggregation
Scenario: Apply different aggregate methods to a column based on tags. For
example, when tag is code1, calculate max
every 10 minutes;
When tag is code2, calculate min
every 10 minutes; When tag
is code3, calculate avg
every 10 minutes. Finally pivot the
table so that each metric has a separate column.
Data simulation:
N = 1000000
t = table("code" + string(take(1..3, N)) as tag,
sort(take([2021.06.28T00:00:00, 2021.06.28T00:10:00, 2021.06.28T00:20:00], N)) as time,
take([1.0, 2.0, 9.1, 2.0, 3.0, 9.1, 9.1, 2.0, 3.0], N) as value)
The following script constructs a dictionary with tags as key and function names as value. Then group the data by time and tags, and apply different user-defined aggregate functions to different groups.
codes = dict(`code1`code2`code3, [max, min, avg])
defg func(tag, value, codes) : codes[tag.first()](value)
timer {
t_tmp = select func(tag, value, codes) as value from t
group by tag, interval(time, 10m, "null") as time
t_result = select value from t_tmp pivot by time, tag
}
Time Elapsed: 76 ms
The function interval
can only be used in a group
by
clause. The above example fills in the missing values with NULL
values by specifying the parameter fill to "null".
Calculate Stock Return Volatility
Scenario: Given the daily return rate of a stock over the past ten years, calculate the volatility of the stock return on a monthly basis.
Data simulation:
N = 3653
t = table(2011.11.01..2021.10.31 as date,
take(`AAPL, N) as code,
rand([0.0573, -0.0231, 0.0765, 0.0174, -0.0025, 0.0267, 0.0304, -0.0143, -0.0256, 0.0412, 0.0810, -0.0159, 0.0058, -0.0107, -0.0090, 0.0209, -0.0053, 0.0317, -0.0117, 0.0123], N) as rate)
Group the data by month with function interval
and calculate the
standard deviation. The filling method is set to "prev", meaning that a missing
value will be filled with its previous value.
timer res = select std(rate) from t group by code, interval(month(date), 1, "prev")
Time Elapsed: 1.8 ms
Calculate Value of Stock Portfolio
Scenario: When backtesting index arbitrage trading, calculate the value of a given stock portfolio.
Backtesting on massive historical data often has high requirements for system memory and computing performance. The following example demonstrates how to use DolphinDB SQL language to perform such calculations in an extremely concise manner.
For simplicity, we assume the portfolio is composed of two stocks: AAPL and FB and simulate the data:
syms = take(`AAPL, 6) join take(`FB, 5)
time = 2019.02.27T09:45:01.000000000 + [146, 278, 412, 445, 496, 789, 212, 556, 598, 712, 989]
prices = 173.27 173.26 173.24 173.25 173.26 173.27 161.51 161.50 161.49 161.50 161.51
quotes = table(take(syms, 100000) as Symbol,
take(time, 100000) as Time,
take(prices, 100000) as Price)
weights = dict(`AAPL`FB, 0.6 0.4)
ETF = select Symbol, Time, Price*weights[Symbol] as weightedPrice from quotes
Inefficient:
First, transform the table on two dimensions, stock components and prices, and preserve the time column. Then forward fill NULL values and calculate the sum of the contributions of each constituent to the portfolio price for each row.
timer {
colAAPL = array(DOUBLE, ETF.Time.size())
colFB = array(DOUBLE, ETF.Time.size())
for(i in 0:ETF.Time.size()) {
if(ETF.Symbol[i] == `AAPL) {
colAAPL[i] = ETF.weightedPrice[i]
colFB[i] = NULL
}
if(ETF.Symbol[i] == `FB) {
colAAPL[i] = NULL
colFB[i] = ETF.weightedPrice[i]
}
}
ETF_TMP1 = table(ETF.Time, ETF.Symbol, colAAPL, colFB)
ETF_TMP2 = select last(colAAPL) as colAAPL, last(colFB) as colFB from ETF_TMP1 group by time, Symbol
ETF_TMP3 = ETF_TMP2.ffill()
t1 = select Time, rowSum(colAAPL, colFB) as rowSum from ETF_TMP3
}
Time elapsed: 713 ms.
Efficient:
The optimized script calculates the portfolio value with just one line of code:
First use DolphinDB pivot by
clause to transform the table
where the NULL values are filled with function ffill
. Then
calculate the average value with avg and sum it up with
rowSum
:
timer t2 = select rowSum(ffill(last(weightedPrice))) from ETF pivot by Time, Symbol
Time elapsed: 23 ms
each(eqObj, t1.values(), t2.values()) //true
The optimized script improves the query performance by around 30 times.
The above example only takes two stocks as the components. The loops take much more time when the number of involved stocks increases.
pivot by
is a unique feature in DolphinDB and an extension to
the standard SQL. It rearranges a column (or multiple columns) of a table (with
or without a data transformation function) on two dimensions. This feature not
only simplifies programming, but also avoids generating intermediate tables,
thus effectively improving calculation performance and avoiding issues with
insufficient memory.
Another case for IoT:
Scenario: Given three measuring points for real-time data collection. Calculate one-minute average for each point and then sum the averages of the three points for one minute.
Data simulation:
N = 10000
t = table(take(`id1`id2`id3, N) as id,
rand(2021.01.01T00:00:00.000 + 100000 * (1..10000), N) as time,
rand(10.0, N) as value)
The following query uses the bar
function to aggregate data by
minute, and pivot by
clause to rearrange the table with minutes
as rows and measuring points as columns. Then fill in the NULL values with
ffill
function, calculate the averages with
avg
, and sum the averages up with rowSum
.
Finally, use the group by
clause with function
interval
to fill in missing values.
timePeriod = 2021.01.01T00:00:00.000 : 2021.01.01T01:00:00.000
timer result = select sum(rowSum) as v from (
select rowSum(ffill(avg(value))) from t
where id in `id1`id2`id3, time between timePeriod
pivot by bar(time, 60000) as minute, id)
group by interval(minute, 1m, "prev") as minute
Time Elapsed: 12 ms
Segment Time-Series Data by Trading Volume
Scenario: Divide minute-by-minute market data into time intervals by trading volume thresholds. The result table holds data segmented into variable-length windows, where each row represents a window and includes information such as trading volume, start time, and end time.
Specifically, initiate a new time window when the trading volume for a stock approaches approximately 1.5 million shares:
-
If adding the next trading volume would keep the cumulative volume of the current window closer to 1.5 million shares, include the record in the current window.
-
Otherwise, start a new window with the next record.
Data simulation:
N = 28
t = table(take(`600000.SH, N) as wind_code,
take(2015.02.11, N) as date,
take(13:03:00..13:30:00, N) as time,
take([288656, 234804, 182714, 371986, 265882, 174778, 153657, 201388, 175937, 138388, 169086, 203013, 261230, 398971, 692212, 494300, 581400, 348160, 250354, 220064, 218116, 458865, 673619, 477386, 454563, 622870, 458177, 880992], N) as volume)
Define a cumulative function caclCumVol
based on the segmenting
rules:
def caclCumVol(target, cumVol, nextVol) {
newVal = cumVol + nextVol
if(newVal < target) return newVal
else if(newVal - target > target - cumVol) return nextVol
else return newVal
}
Use the higher-order function accumulate
to iteratively apply
caclCumVol
to the current cumulative trading volume and the
next volume. If the cumulative trading volume in the window is equal to the
trading volume of the current record, a new window is initiated. The time of the
current record is taken as the start time of the window, or filled with its
previous value, so that records in a window share the same start time. The data
is grouped by start time for aggregation.
timer result = select first(wind_code) as wind_code, first(date) as date, sum(volume) as sum_volume, last(time) as endTime
from t
group by iif(accumulate(caclCumVol{1500000}, volume) == volume, time, NULL).ffill() as startTime
Time elapsed: 0.9 ms
Cases on Metaprogramming: Dynamically Generate SQL Statements
Case 1
Scenario: Calculate 10-minute OHLC grouped by SecurityID and Date.
Data simulation:
N = 10000000
t = table(take(format(1..4000, "000000") + ".SH", N) as SecurityID,
take(2021.10.01..2021.10.31, N) as TradeDate,
take(join(09:30:00 + 1..120 * 60, 13:00:00 + 1..120 * 60), N) as TradeTime,
rand(100.0, N) as cal_variable)
min_num = 10
Inefficient:
Concatenate statements into a string and convert the string into metacode using function parseExpr. To execute the generated metacode, use function eval.
res = parseExpr("select " + avg + "(cal_variable) as FactorValue from t group by bar(TradeTime, " + min_num + "m) as minute_TradeTime, SecurityID, DataDate").eval()
Time elapsed: 219 ms
Efficient:
The built-in function sql in
DolphinDB is used to create a SQL statement dynamically. To execute the
generated SQL statement, use function eval. Function sqlCol is used to generate metacode for selecting one
or multiple columns without calculations. Use function makeCall
to call the bar
function with the specified parameters to
generate the script and function sqlColAlias to define a column using metacode and an optional alias
name.
groupingCols = [sqlColAlias(makeCall(bar, sqlCol("TradeTime"), duration(min_num.string() + "m")), "minute_TradTime"), sqlCol("SecurityID"), sqlCol("DataDate")]
res = sql(select = sqlCol("cal_variable", funcByName("avg"), "FactorValue"),
from = t, groupBy = groupingCols, groupFlag = GROUPBY).eval()
Time elapsed: 200 ms
Similarly, function sqlUpdate can be used to dynamically generate a metacode of the SQL update statement, and function sqlDelete the SQL delete statement.
Case 2
Scenario: Execute a set of queries on daily basis, and merge the query results.
Data simulation:
N = 100000
t = table(take(50982208 51180116 41774759, N) as vn,
rand(25 1180 50, N) as bc,
take(814 333 666, N) as cc,
take(11 12 3, N) as stt,
take(2 116 14, N) as vt,
take(2020.02.05..2020.02.05, N) as dsl,
take(52354079..52354979, N) as mt)
The following queries are to be executed every day:
t1 = select * from t where vn=50982208, bc=25, cc=814, stt=11, vt=2, dsl=2020.02.05, mt < 52355979 order by mt desc limit 1
t2 = select * from t where vn=50982208, bc=25, cc=814, stt=12, vt=2, dsl=2020.02.05, mt < 52355979 order by mt desc limit 1
t3 = select * from t where vn=51180116, bc=25, cc=814, stt=12, vt=2, dsl=2020.02.05, mt < 52354979 order by mt desc limit 1
t4 = select * from t where vn=41774759, bc=1180, cc=333, stt=3, vt=116, dsl=2020.02.05, mt < 52355979 order by mt desc limit 1
reduce(unionAll, [t1, t2, t3, t4])
The following example dynamically generates SQL statements through
metaprogramming. The columns included in the filter condition are the same as
the sort columns. You can use the following user-defined function
bundleQuery
:
def bundleQuery(tbl, dt, dtColName, mt, mtColName, filterColValues, filterColNames){
cnt = filterColValues[0].size()
filterColCnt = filterColValues.size()
orderByCol = sqlCol(mtColName)
selCol = sqlCol("*")
filters = array(ANY, filterColCnt + 2)
filters[filterColCnt] = expr(sqlCol(dtColName), ==, dt)
filters[filterColCnt+1] = expr(sqlCol(mtColName), <, mt)
queries = array(ANY, cnt)
for(i in 0:cnt) {
for(j in 0:filterColCnt){
filters[j] = expr(sqlCol(filterColNames[j]), ==, filterColValues[j][i])
}
queries.append!(sql(select=selCol, from=tbl, where=filters, orderBy=orderByCol, ascOrder=false, limit=1))
}
return loop(eval, queries).unionAll(false)
}
Parameters of bundleQuery
:
-
tbl is a table;
-
dt is the date value in the filter condition;
-
dtColName is the name of the date column in the filter condition;
-
mt is the mt value in the filter condition;
-
mtColName is the names of the mt column and sort column in the filter condition;
-
filterColValues is a tuple that specifies other values in the filter condition. Each vector of the tuple represents a filter condition, and the elements of the vector are the filtering values;
-
filterColNames is a vector that indicates the column names in the filter condition.
Call bundleQuery
to generate a SQL statement:
dt = 2020.02.05
dtColName = "dsl"
mt = 52355979
mtColName = "mt"
colNames = `vn`bc`cc`stt`vt
colValues = [50982208 50982208 51180116 41774759, 25 25 25 1180, 814 814 814 333, 11 12 12 3, 2 2 2 116]
bundleQuery(t, dt, dtColName, mt, mtColName, colValues, colNames)
Log in as an administrator and execute the following script to define
bundleQuery
as a function view so that the function can be
called after node startup:
addFunctionView(bundleQuery)
Manually coding SQL statements can be time-consuming, and limited in terms of query scalability. This issue can be addressed by dynamically generating SQL statements at runtime through metaprogramming.