DolphinDB SQL Tuning Case Study

This tutorial introduces the best practices for writing SQL statements with optimal performance in DolphinDB. We will explain the techniques for writing SQL scripts by comparing the performance before and after query tuning.

Environment Setup

Prerequisites:

  • CPU: Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz

  • Core count: 64

  • Memory: 512 GB

  • Operating System: CentOS Linux release 7.9

  • License: Community Edition, 2 CPU cores, 8 GB Memory

  • DolphinDB Server Version: DolphinDB_Linux64_V2.00.6, standalone mode

  • DolphinDB GUI Version: DolphinDB_GUI_V 1.30.15

Cases in this tutorial use simulated NYSE TAQ (Trade and Quote) data of June 2020. The following script simulates the data of 8,000 stock quotes based on the table schema of Daily TAQ, creates OLAP and TSDB databases and tables, and appends data to these tables:

model = table(1:0, `Symbol`DateTime`BidPx`OfrPx`BidSize`OfrSize`Mode`Ex`Mmid, [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, INT, INT, INT, SYMBOL, SYMBOL])

// Create an OLAP database and table
dbDate = database("", VALUE, 2020.06.01..2020.06.07)
dbSymbol = database("", HASH, [SYMBOL, 64])
db = database("dfs://TAQ", COMPO, [dbDate, dbSymbol])
createPartitionedTable(db, model, `Quotes, `DateTime`Symbol)

// Create a TSDB database and table
dbDate = database("", VALUE, 2020.06.01..2020.06.07)
dbSymbol = database("", HASH, [SYMBOL, 32])
db = database("dfs://TAQ_TSDB", COMPO, [dbDate, dbSymbol], engine="TSDB")
createPartitionedTable(db, model, `Quotes, `DateTime`Symbol, sortColumns=`Symbol`DateTime)

// Simulate data of half an hour to avoid OOM
def mockOneHourData(Date, StartTime) {
	v_Symbol = format(1..8000, "0000")
	v_DateTime = concatDateTime(Date, time(StartTime) + 0..(30 * 60 * 1000 - 1))
	v_Mode = [3, 4, 5, 7, 8, 9, 10, 11, 13, 15, 16, 17, 23, 29]
	v_Ex = [`A, `B, `C, `D, `M, `N, `P, `T, `W, `X]
	v_Mmid = [`FLOW, `EDGX, `EDGA, `NASD, string(NULL)]
	n = 10000000
	return table(take(v_Symbol, n) as Symbol, rand(v_DateTime, n) as DateTime, rand(100.0, n) as BidPx, rand(100.0, n) as OfrPx, rand(1000, n) as BidSiz, rand(100, n) as OfrSize, rand(v_Mode, n) as Mode, rand(v_Ex, n) as Ex, rand(v_Mmid, n) as Mmid)
}

def mockData(DateVector, StartTimeVector) {
	for(Date in DateVector) {
		for(StartTime in StartTimeVector) {
			data = mockOneHourData(Date, StartTime)

			// Append data to the OLAP database
			loadTable("dfs://TAQ", "Quotes").append!(data)

			// Append data to the TSDB database
			loadTable("dfs://TAQ_TSDB", "Quotes").append!(data)
		}
	}
}

mockData(2020.06.01..2020.06.02, 09:30:00 + 0..13 * 1800)

Conditional Filtering

You can use one or more conditional expressions in the where clause of a SQL statement to filter out the records that satisfy the specified conditions.

DolphinDB built-in functions and user-defined functions can be used in the conditional expressions. However, aggregate and order-sensitive functions (such as sum, count, next, deltas, ratios) cannot be used in the where clause when querying DFS tables. This is because the distributed queries need to first prune partitions based on the where conditions, which would fail to narrow down the partition range if those functions were used.

Filter with Keyword in

Scenario: Table t1 stores stock quotes and table t2 contains information about the stock industries. Filter t1 based on the industry information.

Load table Quotes as t1 from database dfs://TAQ and create a table t2 with stock industries.

For example:

t1 = loadTable("dfs://TAQ", "Quotes")
Symbols = exec distinct Symbol from t1 where date(DateTime) = 2020.06.01
t2 = table(Symbols as Symbol, 
		   take(`Mul`IoT`Eco`Csm`Edu`Food, Symbols.size()) as Industry)

Inefficient:

Left join t1 and t2 with the join column Symbol and filter the data with where conditions:

timer res1 = select Symbol, DateTime 
			 from lj(t1, t2, `Symbol) 
			 where date(DateTime) = 2020.06.01, Industry=`Edu

Time elapsed: 7,296 ms

Note: The timer statement returns the execution time of a script on DolphinDB server, which does not include the time to return the result to the client. If the result set is too large, the time spent on serialization/deserialization and network transfer may far exceed the execution time on the server.

Efficient:

Obtain a vector of symbols in the "Edu" industry from t2 and specify the range of conditions using keyword in:

timer{
  Symbols = exec Symbol from t2 where Industry="Edu"
  res2 = select Symbol, DateTime 
				 from t1 
				 where date(DateTime) = 2020.06.01, Symbol in Symbols
}

Time elapsed: 2,866 ms

each(eqObj, res1.values(), res2.values()) // true

We apply function each to each column of the two output tables to compare the results. Returning true means that the results of two queries are consistent. The second query improves the query performance by about 3 times because the join operation consumes much more time than filtering operation with a where clause. Therefore, using dictionaries or the keyword in is more recommended than table join in a SQL statement.

Filter Data within Groups

Scenario: Query daily trade data and obtain the records with the top 25% highest trading volumes for each stock.

Load table Quotes as quotes from database dfs://TAQ for later reference:

quotes = loadTable("dfs://TAQ", "Quotes")

Use the context by clause for data grouping and calculate a linear interpolation of the 75% quantile as the minimum value on the OfrSize column:

timer result = select * from quotes 
			   where date(DateTime) = 2020.06.01 
			   context by Symbol having OfrSize >= percentile(OfrSize, 75, "linear")

The context by is a DolphinDB SQL keyword for grouped computation. Both context by and group by conduct grouping. However, with group by, each group returns a scalar value; with context by, each group returns a vector of the same size as the group's records. The group by clause can only be used with aggregate functions, whereas the context by clause can be used with aggregate functions, moving window functions, cumulative functions, etc.

The having clause is often used after group by or context by clause to select the records satisfying the specific conditions.

Scenario: After selecting records with the top 25% volumes for each stock, calculate the standardized deviation of LastPx.

Inefficient:

After calculating the linear interpolation of the 75% quantile, use the group by clause to group the stocks and calculate the standardized deviation. The results are sorted with the order by clause.

timer select std(OfrPx) as std from (
	  select Symbol, OfrPx from quotes 
	  where date(DateTime) = 2020.06.01 
	  context by Symbol 
	  having OfrSize >= percentile(OfrSize, 75, "linear")) 
	  group by Symbol 
	  order by Symbol

Time elapsed: 4,738 ms

Efficient:

Group the stocks by the symbols with group by. Select the records with the top 25% volumes with higher-order function aggrTopN and calculate the standardized deviation.

timer select aggrTopN(std, OfrPx, OfrSize, 0.25, false) as std from quotes 
	  where date(DateTime) = 2020.06.01 
	  group by Symbol 
	  order by Symbol

Time elapsed: 2,600 ms

The original script groups data and filters them with the given conditions before aggregation. The optimized query implements filter and aggregation directly after data grouping, demonstrating better performance.

Separate Conditions in Where Clause: Commas or the AND Operator

When multiple conditions are present in the where clause, you can control how they are applied using either commas or the AND operator:

  • If the conditions are separated by commas, the data is filtered sequentially by each condition. The first condition is applied to the original data, narrowing it down. Subsequently, each following condition is applied to the output of the preceding one, progressively narrowing down the data.

  • In contrast, if the conditions are separated by "and", each condition filters the original data separately. The results of all the conditions are then intersected to return only the data that meets every condition.

The following examples demonstrate the differences.

First, simulate example data:

N = 10000000
t = table(take(2019.01.01..2019.01.03, N) as date, 			  
          take(`C`MS`MS`MS`IBM`IBM`IBM`C`C$SYMBOL, N) as sym, 
          take(49.6 29.46 29.52 30.02 174.97 175.23 50.76 50.32 51.29, N) as price, 
          take(2200 1900 2100 3200 6800 5400 1300 2500 8800, N) as qty)

Based on whether order-sensitive functions (e.g., deltas, ratios, ffill, move, prev, cumsum) are used, there are two scenarios:

Order-Insensitive Conditions

Sample code:

timer(10) t1 = select * from t where qty > 2000, date = 2019.01.02, sym = `C
timer(10) t2 = select * from t where qty > 2000 and date = 2019.01.02 and sym = `C

each(eqObj, t1.values(), t2.values()) // true

The two queries cost 902 ms and 930 ms, respectively. Using a comma or "and" makes little difference in performance.

Now we change the order of the conditions in these two queries and see how it affects the query performance and result:

timer(10) t3 = select * from t where date = 2019.01.02, sym = `C, qty > 2000
timer(10) t4 = select * from t where date = 2019.01.02 and sym = `C and qty > 2000

each(eqObj, t1.values(), t3.values()) // true
each(eqObj, t2.values(), t4.values()) // true

The two queries cost 669 ms and 651 ms, respectively. Using a comma or "and" makes little difference in performance.

The order of conditions does not affect the result when no order-sensitive functions are used. However, t3 and t4 are faster than t1 and t2 by 30%. This suggests that optimizing the query's performance can be achieved by first filtering on columns that more efficiently narrow down the data.

This suggests that optimizing the query's performance is achieved by first filtering on columns that more efficiently narrow down the data.

Order-Sensitive Conditions

Sample code:

timer(10) t1 = select * from t where ratios(qty) > 1, date = 2019.01.02, sym = `C
timer(10) t2 = select * from t where ratios(qty) > 1 and date = 2019.01.02 and sym = `C

each(eqObj, t1.values(), t2.values()) // true

The two queries cost 1,503 ms and 1,465 ms, respectively.

Using a comma or "and" makes little difference in performance. The only order-sensitive condition ratio() is placed as the first condition in the where clause, so it is evaluated first in query t1 (which uses comma). Therefore, the query result of t1 is the same as that of t2.

Now we change the order of the conditions in these two queries and see how it affects the query performance and result:

timer(10) t3 = select * from t where date = 2019.01.02, sym = `C, ratios(qty) > 1
timer(10) t4 = select * from t where date = 2019.01.02 and sym = `C and ratios(qty) > 1

each(eqObj, t2.values(), t4.values()) // true
each(eqObj, t1.values(), t3.values()) // false

The two queries cost 507 ms and 1,433 ms, respectively. The results of the queries t2 and t4 are the same, whereas the results of t1 and t3 are different.

This demonstrates that when a conditions is order-sensitive, it makes no difference to the result and performance of queries using "and" to separate conditions; but for queries using commas, putting the order-sensitive condition in a later position will improve the performance but also change the final result.

Conclusion:

  • When query conditions are not order-sensitive, using comma or "and" gives similar performance. The system converts "and" to comma internally and evaluates conditions left-to-right. It is recommended to put more selective filters earlier to optimize performance.

  • For order-sensitive conditions, always use "and" to avoid errors. "and" evaluates conditions independently and intersects the results, so the order does not affect performance or result.

Cases on DFS Tables

The syntax of distributed queries are the same as the standard queries. Nevertheless, knowledge of how distributed queries are processed could help you make queries more efficient.

DolphinDB doesn't have row indices for data storage and retrieval, but uses partitions as the physical index of a database. For a distributed query, DolphinDB identifies the partitions involved based on the "where" condition, then breaks it down into subqueries and send them to the nodes holding these partitions (the "map" phase). Intermediate results on the nodes are then sent back to the initial node ("merge") and further processed to generate the final result ("reduce").

Partition Pruning

If a filter condition in the where clause satisfies the following:

  • Contains only the partitioning column(s) (without embedding any functions) of the DFS table

  • Contains only relational operators (<, <=, =, ==, >, >=, in, between), logical operators (or, and), and constants (including operations between constants)

  • Not a chained condition (e.g. 100<x<200)

  • The filtering logic narrows down the query scope

then the system only loads the relevant partitions for the query instead of scanning all data, which greatly reduces the query time. This process is called partition pruning.

Scenario: Query the number of records of each stock during a certain time period.

First, load the "Quotes" table in the "TAQ" database and assign it to the "quotes" variable:

quotes = loadTable("dfs://TAQ", "Quotes")

Inefficient:

In the comparison condition, use temporalFormat to convert the temporal variable "DateTime" to strings:

timer t1 = select count(*) from quotes 
		   where temporalFormat(DateTime, "yyyy.MM.dd") >= "2020.06.01" and temporalFormat(DateTime, "yyyy.MM.dd") <= "2020.06.02" 
		   group by Symbol 

Time elapsed: 60,514 ms

Efficient:

Use date to convert the temporal variable "DateTime" to DATE type.

timer t2 = select count(*) from quotes 
		   where date(DateTime) between 2020.06.01 : 2020.06.02 
		   group by Symbol 

Time elapsed: 1,633 ms

each(eqObj, t1.values(), t2.values()) // true

The performance of the second query is tens of times faster than the first query.

In the first query, the values in the partitioning column "DateTime" are converted into strings, making it impossible for partition pruning.

More examples of where conditions that cannot narrow down the relevant partitions:

  1. Operations on a partitioning column

    select count(*) from quotes where date(DateTime) + 1 > 2020.06.01
  2. Using chained comparison operators

    select count(*) from quotes where 2020.06.01 < date(DateTime) < 2020.06.03
  3. Using non-partitioning columns

    select count(*) from quotes where OfrSize < 500
  4. Comparing a partitioning column with another column. (Note: "AnnouncementDate" is not an actual column in the "Quotes" table created at the beginning of this tutorial. It is used here for illustrative purpose only.)

    select count(*) from quotes where date(DateTime) < AnnouncementDate - 3

Parallel Queries with the group by Clause

In a distributed query, if the "group by" clause contains a partitioning column, the query will be broken down into subqueries and conducted concurrently in each partition.

Scenario: For each stock over a time period, set flags by comparing the bid/offer prices, and calculate the difference between the best bid and offer prices, the total trading volume, and other key metrics.

First, load the "Quotes" table:

quotes = loadTable("dfs://TAQ", "Quotes")

Inefficient:

From data later than 14:30:00, June 1, 2020, flag the records with offer prices higher than bid prices as "1", flag the rest of the records as "0", and save the results to the in-memory table "tmp_t". From "tmp_t", group the records by "Symbol", "DateTime" and "Flag", and calculate the number of OfrPx records and the sum of OfrSize in each group.

timer {
	tmp_t = select *, iif(OfrPx > BidPx, 1, 0) as Flag 
			from quotes 
			where date(DateTime) = 2020.06.01, second(DateTime) >= 14:30:00
	t1 = select iif(max(OfrPx) - min(OfrPx) == 0, 0, 1) as OfrPxDiff, count(OfrPx) as OfrPxCount, sum(OfrSize) as OfrSizeSum 
			from tmp_t 
			group by Symbol, date(DateTime) as Date, Flag
}

Time elapsed: 23,859 ms

Efficient:

Query the DFS table "quotes" directly without introducing the in-memory table:

timer t2 = select iif(max(OfrPx) - min(BidPx) == 0, 0, 1) as OfrPxDiff, count(OfrPx) as OfrPxCount, sum(OfrSize) as OfrSizeSum 
		   from quotes 
		   where date(DateTime) = 2020.06.01, second(DateTime) >= 14:30:00 
		   group by Symbol, date(DateTime) as Date, iif(OfrPx > BidPx, 1, 0) as Flag
each(eqObj, t1.values(), (select * from t2 order by Symbol, Date, Flag).values()) // true

The performance of the second query is 3 times faster. The improvement lies in:

(1) The first query combines data from different partitions into an in-memory table, then divides the data into groups for calculation. Operations on the in-memory table can only be processed by a single CPU core.

(2) The second query performs group calculation on the DFS table directly and the grouping columns contain a partitioning column, which allows for parallel computing with multiple CPU cores.

As a general rule, for distributed queries and computing, avoid generating intermediate results whenever possible, and operate directly on the original DFS table for optimal performance.

Use the "map" Keyword in Grouped Queries

The "map" keyword makes the query to be executed separately within each partition. The output is the results from each partition-level query execution.

Scenario: Query the number of records per minute for each stock.

Load the "Quotes" table:

quotes = loadTable("dfs://TAQ", "Quotes")

Inefficient:

timer result = select count(*) from quotes group by Symbol, bar(DateTime, 60s)

Time elapsed: 16,058 ms

Efficient:

Add the "map" keyword to the query:

timer result = select count(*) from quotes group by Symbol, bar(DateTime, 60s) map

Time elapsed: 14,520 ms

The performance of the second query is faster by 10% - 20%.

The first query is executed in two steps:

  1. Calculate within each partition and generate the intermediate results

  2. Collect the intermediate results to generate a final result

If the granularity of the "group by" columns is smaller than the partitioning columns, then we can be sure that there will be no cross-partition calculations. In this example, the "Quotes" DFS table is partitioned by dates, which is more coarse-grained than the 60-second grouping granularity in the query. By adding the "map" keyword to the query, DolphinDB can skip the reduce step and directly generate the grouped output from the map step. This avoids unnecessary computation and improves query performance.

Grouped Calculation

Query the Latest N Records

Scenario: Retrieve the latest 10 records of each stock

Group the data of June 01, 2020 by stock ID to retrieve the latest 10 records of each group. Use the context by clause with keywords csort and top to obtain the latest records for each group.

For example, query 140,000,000 records:

OLAP storage engine:

timer t1 = select * from loadTable("dfs://TAQ", "Quotes") 
		   where date(DateTime) = 2020.06.01 
		   context by Symbol csort DateTime limit -10

Time elapsed: 20,510 ms

TSDB storage engine:

timer t2 = select * from loadTable("dfs://Level1_TSDB", "Snapshot") 
		   where date(DateTime) = 2020.06.01 
		   context by SecurityID csort DateTime limit -10 

Time elapsed: 2,454 ms

each(eqObj, t1.values(), t2.values()) //true

DolphinDB version 2.0 or higher provides the TSDB storage engine which uses sort columns to maintain in-partition index. It shows better performance when dealing with time-series point queries than the OLAP engine. In this case, the query performance of TSDB is 8 times faster than OLAP.

Unlike traditional databases, DolphinDB adopts columnar storage to better process time-series data. In addition, the context by clause makes it more convenient to process time series within each group.

Calculate Moving VWAP (Volume Weighted Average Price)

Scenario: An in-memory table contains 3,000 stocks, with each stock having 10,000 records. We compare the performance of calculating the VWAP using mwavg with loops versus context by.

Data simulation:

syms = format(1..3000, "SH000000")
N = 10000
t = cj(table(syms as symbol), table(rand(100.0, N) as price, rand(10000, N) as volume))

Inefficient:

Loop for 3,000 times where each loop obtains the price and volume records of a stock to calculate mwavg. Then combine the results.

arr = array(ANY, syms.size())

timer {
	for(i in 0 : syms.size()) {
		price_vec = exec price from t where symbol = syms[i]
		volume_vec = exec volume from t where symbol = syms[i]
		arr[i] = mwavg(price_vec, volume_vec, 4)
	}
	res1 = reduce(join, arr)
}

Time elapsed: 25 min

Efficient:

Use the context by clause to group data by stock and then calculate mwavg within each group.

timer res2 = select mwavg(price, volume, 4) from t 
			   context by symbol

Time elapsed: 3,176 ms

each(eqObj, res1, res2[`mwavg_price]) // true

The performance of optimized calculation is over 400 times faster. It is because that the context by clause only accesses the data once and calculates in groups, while each loop scans the table to obtain the records of a stock and the script takes more time.

Calculate Cumulative VWAP

Scenario: Calculate the VWAP for each stock every minute since the market opening.

First load the test data:

quotes = loadTable("dfs://TAQ", "Quotes")

Group the data by Symbol with group by, then aggregate data within cgroup-by groups. Sort the results with order by clause for each stock.

timer result = select wavg(OfrPx, OfrSize) as vwap from quotes 
			   where date(DateTime) = 2020.06.01 
			   group by Symbol 
			   cgroup by minute(DateTime) as Minute 
			   order by Symbol, Minute

Time elapsed: 10,798 ms

The cgroup by (cumulative group) clause performs cumulative grouping calculations. See cgroup by.

Calculate VWAP for N Shares of Stocks

Scenario: Calculate the VWAP for all trades related to the latest 1000 traded shares of each stock.

When filtering for the 1000 shares, the following situations may arise: The sum of shares for the latest three trades with quantities of 100, 300, and 600 equals 1000; Or the sum of shares for two trades with quantities of 900 and 300 exceeds 1000. It is necessary to first identify the trades involved in the calculation, ensuring that the sum of shares slightly surpasses 1,000, and after excluding the most recent trade, the sum of shares becomes less than 1,000. Subsequently, calculate the VWAP for these identified trades.

Data simulation:

n = 500000
t = table(rand(string(1..4000), n) as sym, rand(10.0, n) as price, rand(500, n) as vol)

Inefficient:

Use group by to group the data by stocks and call user-defined aggregate function lastVolPx1 for each stock. Loop over all trades to check whether the sum of shares is exactly equal to or greater than 1,000. Then calculate the vwag.

defg lastVolPx1(price, vol, bound) {
	size = price.size()
	cumSum = 0
	for(i in 0:size) {
		cumSum = cumSum + vol[size - 1 - i]
		if(cumSum >= bound) {
			price_tmp = price.subarray(size - 1 - i :)
			vol_tmp = vol.subarray(size - 1 - i :)
			return wavg(price_tmp, vol_tmp)
		}
		if(i == size - 1 && cumSum < bound) {
			return wavg(price, vol)
		}
	}
}

timer lastVolPx_t1 = select lastVolPx1(price, vol, 1000) as lastVolPx from t group by sym

Time Elapsed: 187 ms

Efficient:

Use group by to group the data by stocks and call user-defined aggregate function lastVolPx2 for each stock. Calculate the cumulative sum of trade shares and locate the point after which the volume of shares exceeds 1,000. Then calculate vwag.

defg lastVolPx2(price, vol, bound) {
	cumVol = vol.cumsum()
	if(cumVol.tail() <= bound)
		return wavg(price, vol)
	else {
		start = (cumVol <= cumVol.tail() - bound).sum()
		return wavg(price.subarray(start:), vol.subarray(start:))
	}
}

timer lastVolPx_t2 = select lastVolPx2(price, vol, 1000) as lastVolPx from t group by sym

Time Elapsed: 73 ms

each(eqObj, lastVolPx_t1.values(), lastVolPx_t2.values()) // true

Compared with the original script, the function lastVolPx2 has adopted vectorization, which double improves the performance. Using vectorization instead of loops is a recommended query tuning trick in DolphinDB.

Calculate ROC (Price Rate of Change) by Segment

Scenario: Calculate the price change rate of each stock segmented by a column of the market snapshot data.

The following example is based on the data of June 1st, 2020.

First use the group by clause to group the data by Symbol and segmented OfrPx. Then calculate the price change rate of the first level price for each stock.

timer t = select last(OfrPx) \ first(OfrPx) - 1 from 
		  loadTable("dfs://TAQ", "Quotes") 
		  where date(DateTime) = 2020.06.01 
		  group by Symbol, segment(OfrPx, false)

Time Elapsed: 11,075 ms

The segment function is used to divide a vector into groups, and each group is composed of identical values. It returns a vector of the same length as the input.

Calculate Extreme Values with Different Intervals

Scenario: Obtain a window of consecutive intervals no smaller than the target value. Then take the first record with the maximum value of a specified field within each window.

Data simulation:

t = table(2021.09.29 + 0..15 as date, 
          0 0 0.3 0.3 0 0.5 0.3 0.5 0 0 0.3 0 0.4 0.6 0.6 0 as value)
targetVal = 0.3

Inefficient:

Define a function generateGrp: If the current value is no smaller than the target value, it takes the current group ID; If the next value is smaller than the target value, the group ID is added by 1 to ensure that different consecutive values are assigned to different groups.

def generateGrp(targetVal, val) {
	arr = array(INT, val.size())
	n = 1
	for(i in 0 : val.size()) {
		if(val[i] >= targetVal) {
			arr[i] = n
			if(val[i + 1] < targetVal) n = n + 1
		}
	}
	return arr
}

Use the context by clause to group the data by group ID, the having clause to select the maximum value, and keyword limit to return the first record:

timer(1000) {
	tmp = select date, value, generateGrp(targetVal, value) as grp from t
	res1 = select date, value from tmp where grp != 0 
		   context by grp 
		   having value = max(value) limit 1
}

Time Elapsed: 142 ms

Efficient:

Use the segment function with context by clause to group the consecutive values that are greater than or equal to the target value, and use the having clause for data filtering.

timer(1000) res2 = select * from t 
				   context by segment(value >= targetVal) 
				   having value >= targetVal and value = max(value) limit 1

Time Elapsed: 123 ms

each(eqObj, res1.values(), res2.values()) // true

Compared with the original script, the performance of the second SQL query is optimized by 10%.

The segment function is usually used for order-sensitive data grouping with improved performance and simpler style compared with loops.

Calculate Metrics Using Conditional Aggregation

Scenario: Apply different aggregate methods to a column based on tags. For example, when tag is code1, calculate max every 10 minutes; When tag is code2, calculate min every 10 minutes; When tag is code3, calculate avg every 10 minutes. Finally pivot the table so that each metric has a separate column.

Data simulation:

N = 1000000
t = table("code" + string(take(1..3, N)) as tag, 
          sort(take([2021.06.28T00:00:00, 2021.06.28T00:10:00, 2021.06.28T00:20:00], N)) as time, 
          take([1.0, 2.0, 9.1, 2.0, 3.0, 9.1, 9.1, 2.0, 3.0], N) as value)

The following script constructs a dictionary with tags as key and function names as value. Then group the data by time and tags, and apply different user-defined aggregate functions to different groups.

codes = dict(`code1`code2`code3, [max, min, avg])

defg func(tag, value, codes) : codes[tag.first()](value)
 
timer {
	t_tmp = select func(tag, value, codes) as value from t 
			group by tag, interval(time, 10m, "null") as time
	t_result = select value from t_tmp pivot by time, tag
}

Time Elapsed: 76 ms

The function interval can only be used in a group by clause. The above example fills in the missing values with NULL values by specifying the parameter fill to "null".

Calculate Stock Return Volatility

Scenario: Given the daily return rate of a stock over the past ten years, calculate the volatility of the stock return on a monthly basis.

Data simulation:

N = 3653
t = table(2011.11.01..2021.10.31 as date, 
          take(`AAPL, N) as code, 
          rand([0.0573, -0.0231, 0.0765, 0.0174, -0.0025, 0.0267, 0.0304, -0.0143, -0.0256, 0.0412, 0.0810, -0.0159, 0.0058, -0.0107, -0.0090, 0.0209, -0.0053, 0.0317, -0.0117, 0.0123], N) as rate)

Group the data by month with function interval and calculate the standard deviation. The filling method is set to "prev", meaning that a missing value will be filled with its previous value.

timer res = select std(rate) from t group by code, interval(month(date), 1, "prev")

Time Elapsed: 1.8 ms

Calculate Value of Stock Portfolio

Scenario: When backtesting index arbitrage trading, calculate the value of a given stock portfolio.

Backtesting on massive historical data often has high requirements for system memory and computing performance. The following example demonstrates how to use DolphinDB SQL language to perform such calculations in an extremely concise manner.

For simplicity, we assume the portfolio is composed of two stocks: AAPL and FB and simulate the data:

syms = take(`AAPL, 6) join take(`FB, 5)
time = 2019.02.27T09:45:01.000000000 + [146, 278, 412, 445, 496, 789, 212, 556, 598, 712, 989]
prices = 173.27 173.26 173.24 173.25 173.26 173.27 161.51 161.50 161.49 161.50 161.51
quotes = table(take(syms, 100000) as Symbol, 
               take(time, 100000) as Time, 
               take(prices, 100000) as Price)
weights = dict(`AAPL`FB, 0.6 0.4)
ETF = select Symbol, Time, Price*weights[Symbol] as weightedPrice from quotes

Inefficient:

First, transform the table on two dimensions, stock components and prices, and preserve the time column. Then forward fill NULL values and calculate the sum of the contributions of each constituent to the portfolio price for each row.

timer {
	colAAPL = array(DOUBLE, ETF.Time.size())
	colFB = array(DOUBLE, ETF.Time.size())
	
	for(i in 0:ETF.Time.size()) {
		if(ETF.Symbol[i] == `AAPL) {
			colAAPL[i] = ETF.weightedPrice[i]
			colFB[i] = NULL
		}
		if(ETF.Symbol[i] == `FB) {
			colAAPL[i] = NULL
			colFB[i] = ETF.weightedPrice[i]
		}
	}
	
	ETF_TMP1 = table(ETF.Time, ETF.Symbol, colAAPL, colFB)
	ETF_TMP2 = select last(colAAPL) as colAAPL, last(colFB) as colFB from ETF_TMP1 group by time, Symbol
	ETF_TMP3 = ETF_TMP2.ffill()
	
	t1 = select Time, rowSum(colAAPL, colFB) as rowSum from ETF_TMP3
}

Time elapsed: 713 ms.

Efficient:

The optimized script calculates the portfolio value with just one line of code: First use DolphinDB pivot by clause to transform the table where the NULL values are filled with function ffill. Then calculate the average value with avg and sum it up with rowSum:

timer t2 = select rowSum(ffill(last(weightedPrice))) from ETF pivot by Time, Symbol

Time elapsed: 23 ms

each(eqObj, t1.values(), t2.values()) //true

The optimized script improves the query performance by around 30 times.

The above example only takes two stocks as the components. The loops take much more time when the number of involved stocks increases.

pivot by is a unique feature in DolphinDB and an extension to the standard SQL. It rearranges a column (or multiple columns) of a table (with or without a data transformation function) on two dimensions. This feature not only simplifies programming, but also avoids generating intermediate tables, thus effectively improving calculation performance and avoiding issues with insufficient memory.

Another case for IoT:

Scenario: Given three measuring points for real-time data collection. Calculate one-minute average for each point and then sum the averages of the three points for one minute.

Data simulation:

N = 10000
t = table(take(`id1`id2`id3, N) as id, 
          rand(2021.01.01T00:00:00.000 +  100000 * (1..10000), N) as time, 
          rand(10.0, N) as value)

The following query uses the bar function to aggregate data by minute, and pivot by clause to rearrange the table with minutes as rows and measuring points as columns. Then fill in the NULL values with ffill function, calculate the averages with avg, and sum the averages up with rowSum. Finally, use the group by clause with function interval to fill in missing values.

timePeriod = 2021.01.01T00:00:00.000 : 2021.01.01T01:00:00.000
timer result = select sum(rowSum) as v from (
    		   select rowSum(ffill(avg(value))) from t 
    		   where id in `id1`id2`id3, time between timePeriod 
    		   pivot by bar(time, 60000) as minute, id) 
    		   group by interval(minute, 1m, "prev") as minute

Time Elapsed: 12 ms

Segment Time-Series Data by Trading Volume

Scenario: Divide minute-by-minute market data into time intervals by trading volume thresholds. The result table holds data segmented into variable-length windows, where each row represents a window and includes information such as trading volume, start time, and end time.

Specifically, initiate a new time window when the trading volume for a stock approaches approximately 1.5 million shares:

  • If adding the next trading volume would keep the cumulative volume of the current window closer to 1.5 million shares, include the record in the current window.

  • Otherwise, start a new window with the next record.

Data simulation:

N = 28
t = table(take(`600000.SH, N) as wind_code, 
          take(2015.02.11, N) as date, 
          take(13:03:00..13:30:00, N) as time, 
          take([288656, 234804, 182714, 371986, 265882, 174778, 153657, 201388, 175937, 138388, 169086, 203013, 261230, 398971, 692212, 494300, 581400, 348160, 250354, 220064, 218116, 458865, 673619, 477386, 454563, 622870, 458177, 880992], N) as volume)

Define a cumulative function caclCumVol based on the segmenting rules:

def caclCumVol(target, cumVol, nextVol) {
	newVal = cumVol + nextVol
	if(newVal < target) return newVal
	else if(newVal - target > target - cumVol) return nextVol
	else return newVal
}

Use the higher-order function accumulate to iteratively apply caclCumVol to the current cumulative trading volume and the next volume. If the cumulative trading volume in the window is equal to the trading volume of the current record, a new window is initiated. The time of the current record is taken as the start time of the window, or filled with its previous value, so that records in a window share the same start time. The data is grouped by start time for aggregation.

timer result = select first(wind_code) as wind_code, first(date) as date, sum(volume) as sum_volume, last(time) as endTime 
			   from t 
			   group by iif(accumulate(caclCumVol{1500000}, volume) == volume, time, NULL).ffill() as startTime

Time elapsed: 0.9 ms

Cases on Metaprogramming: Dynamically Generate SQL Statements

Case 1

Scenario: Calculate 10-minute OHLC grouped by SecurityID and Date.

Data simulation:

N = 10000000
t = table(take(format(1..4000, "000000") + ".SH", N) as SecurityID, 
          take(2021.10.01..2021.10.31, N) as TradeDate, 
          take(join(09:30:00 + 1..120 * 60, 13:00:00 + 1..120 * 60), N) as TradeTime, 
          rand(100.0, N) as cal_variable)
min_num = 10

Inefficient:

Concatenate statements into a string and convert the string into metacode using function parseExpr. To execute the generated metacode, use function eval.

res = parseExpr("select " + avg + "(cal_variable) as FactorValue from t group by bar(TradeTime, " + min_num + "m) as minute_TradeTime, SecurityID, DataDate").eval()

Time elapsed: 219 ms

Efficient:

The built-in function sql in DolphinDB is used to create a SQL statement dynamically. To execute the generated SQL statement, use function eval. Function sqlCol is used to generate metacode for selecting one or multiple columns without calculations. Use function makeCall to call the bar function with the specified parameters to generate the script and function sqlColAlias to define a column using metacode and an optional alias name.

groupingCols = [sqlColAlias(makeCall(bar, sqlCol("TradeTime"), duration(min_num.string() + "m")), "minute_TradTime"), sqlCol("SecurityID"), sqlCol("DataDate")]
res = sql(select = sqlCol("cal_variable", funcByName("avg"), "FactorValue"), 
          from = t, groupBy = groupingCols, groupFlag = GROUPBY).eval()

Time elapsed: 200 ms

Similarly, function sqlUpdate can be used to dynamically generate a metacode of the SQL update statement, and function sqlDelete the SQL delete statement.

Case 2

Scenario: Execute a set of queries on daily basis, and merge the query results.

Data simulation:

N = 100000
t = table(take(50982208 51180116 41774759, N) as vn, 
          rand(25 1180 50, N) as bc, 
          take(814 333 666, N) as cc, 
          take(11 12 3, N) as stt, 
          take(2 116 14, N) as vt, 
          take(2020.02.05..2020.02.05, N) as dsl, 
          take(52354079..52354979, N) as mt)

The following queries are to be executed every day:

t1 = select * from t where vn=50982208, bc=25, cc=814, stt=11, vt=2, dsl=2020.02.05, mt < 52355979 order by mt desc limit 1
t2 = select * from t where vn=50982208, bc=25, cc=814, stt=12, vt=2, dsl=2020.02.05, mt < 52355979 order by mt desc limit 1
t3 = select * from t where vn=51180116, bc=25, cc=814, stt=12, vt=2, dsl=2020.02.05, mt < 52354979 order by mt desc limit 1
t4 = select * from t where vn=41774759, bc=1180, cc=333, stt=3, vt=116, dsl=2020.02.05, mt < 52355979 order by mt desc limit 1

reduce(unionAll, [t1, t2, t3, t4])

The following example dynamically generates SQL statements through metaprogramming. The columns included in the filter condition are the same as the sort columns. You can use the following user-defined function bundleQuery:

def bundleQuery(tbl, dt, dtColName, mt, mtColName, filterColValues, filterColNames){
	cnt = filterColValues[0].size()
	filterColCnt = filterColValues.size()
	orderByCol = sqlCol(mtColName)
	selCol = sqlCol("*")
	filters = array(ANY, filterColCnt + 2)
	filters[filterColCnt] = expr(sqlCol(dtColName), ==, dt)
	filters[filterColCnt+1] = expr(sqlCol(mtColName), <, mt)
	
	queries = array(ANY, cnt)
	for(i in 0:cnt)	{
		for(j in 0:filterColCnt){
			filters[j] = expr(sqlCol(filterColNames[j]), ==, filterColValues[j][i])
		}
		queries.append!(sql(select=selCol, from=tbl, where=filters, orderBy=orderByCol, ascOrder=false, limit=1))
	}
	return loop(eval, queries).unionAll(false)
}

Parameters of bundleQuery:

  • tbl is a table;

  • dt is the date value in the filter condition;

  • dtColName is the name of the date column in the filter condition;

  • mt is the mt value in the filter condition;

  • mtColName is the names of the mt column and sort column in the filter condition;

  • filterColValues is a tuple that specifies other values in the filter condition. Each vector of the tuple represents a filter condition, and the elements of the vector are the filtering values;

  • filterColNames is a vector that indicates the column names in the filter condition.

Call bundleQuery to generate a SQL statement:

dt = 2020.02.05
dtColName = "dsl"
mt = 52355979
mtColName = "mt"
colNames = `vn`bc`cc`stt`vt
colValues = [50982208 50982208 51180116 41774759, 25 25 25 1180, 814 814 814 333, 11 12 12 3, 2 2 2 116]

bundleQuery(t, dt, dtColName, mt, mtColName, colValues, colNames)

Log in as an administrator and execute the following script to define bundleQuery as a function view so that the function can be called after node startup:

addFunctionView(bundleQuery)

Manually coding SQL statements can be time-consuming, and limited in terms of query scalability. This issue can be addressed by dynamically generating SQL statements at runtime through metaprogramming.