Factor Computing Platform with Python Celery and DolphinDB
Research and trading in quantitative finance rely on factor discovery as its core work. Traditional development workflow often adopts Python to read data from relational databases like SQL Server and perform factor calculations. As trading volumes and market data continue to expand, these traditional platforms face performance bottlenecks.
This tutorial focuses on improving factor computation performance by introducing DolphinDB as a core computing tool into traditional Python-based platforms. The proposed DolphinDB-based factor computing platform includes data synchronization module, factor computing module, and task scheduling module. It can provide business departments with real-time factor computation, batch processing, and historical factor query services. DolphinDB is well-suited for factor computation across high, medium, and low frequency data. It offers extensive APIs and ETL tools that allow integrating DolphinDB as a computational engine while retaining existing Python processes. We will walk through the process of building a complete factor computing platform, using the first factor WQAlpha1
in the WorldQuant 101 Alpha library as an example.
1. Architecture Overview
The unified factor computing platform built in this tutorial consists of the following key components:
- Data synchronization module:
DataX is a widely used tool/platform for offline data synchronization to migrate the original and incremental data from the relational database SQL Server to DolphinDB.
- Factor computing and data storage module:
DolphinDBserves as the core data processing framework for factor computing and data storage. It provides function views that allow invoking functions via the DolphinDB Python API in a Python program.
- Task scheduling module:
Celery is an open-source distributed task queue, which helps improve the performance and scalability of applications. Celery serves as the task scheduling framework, combined with Redis for message brokering and result backend.
- The calculation results are saved in DataFrames and visualized in web-based interface.
2. Environment Setup
A single-node cluster is deployed as follows for test environment in this tutorial.
Hardware | Description |
---|---|
Host | cnserver 9 |
IP | xxx.xxx.xxx.122 |
OS | Linux kernel 3.10 or higher |
RAM | 64 GB |
CPU | x86_64 (12 cores) |
Software | Version |
---|---|
DolphinDB | V2.00.7 |
SQL Server | latest release of 2019 |
dataX | 3.0 |
JDK (required by dataX installation) | 1.8.0_xxx |
Maven (required by dataX installation) | 3.6.1+ |
Python (required by dataX installation) | 2.x |
Celery | 4.3.0 |
Python (NumPy, pandas, Celery libraries, and DolphinDB Python API required) | 3.7.9 |
Redis | 6.2.7 |
3. Platform Building
3.1 Table Structures
This tutorial uses the daily closing prices of several stocks from 2020.01.01 to 2021.01.01, with a total of 544,174 records. Below are the corresponding table structures in SQL Server and DolphinDB:
Column | Description | Data Type (SQL Server) | Data Type (DolphinDB) |
---|---|---|---|
SecurityID | stock ID | varchar | SYMBOL |
TradeDate | trade date | date | DATE |
Value | closing price | float | DOUBLE |
3.2 Metrics
This example calculates the factor WQAlpha1
defined in the DolphinDB WorldQuant 101 Alpha module.
3.3 Synchronizing Data from SQL Server to DolphinDB
Note: The following migration instruction assumes that SQL Server databases are already created. The DolphinDB port number is 8848.
(1) Creating DolphinDB databases and tables
Before importing data, databases and tables must be created first on the DolphinDB server. Create database dfs://tick_close and table tick_close:
dbName = "dfs://tick_close"
tbName = "tick_close"
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db = database(dbName, RANGE, date(datetimeAdd(2000.01M,0..50*12,'M')))
name = `SecurityID`TradeDate`Value
type = `SYMBOL`DATE`DOUBLE
schemaTable = table(1:0, name, type)
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`TradeDate)
(2) Configuring data import
The dataX import requires a .json config file specifying data source details. In general, the synchronization of a table often requires one configuration file. The file configured for the table tick_close is shown as follows:
{
"job": {
"content": [
{
"writer": {
"parameter": {
"dbPath": "dfs://tick_close",
"tableName": "tick_close",
"batchSize": 100,
"userId": "admin",
"pwd": "123456",
"host": "127.0.0.1",
"table": [
{
"type": "DT_SYMBOL",
"name": "SecurityID"
},
{ "type": "DT_DATE",
"name": "TradeDate"
},
{
"type": "DT_DOUBLE",
"name": "Value"
}
],
"port": 8848
},
"name": "dolphindbwriter"
},
"reader": {
"name": "sqlserverreader",
"parameter": {
"username": "SA",
"password": "Sa123456",
"column": [
"*"
],
"connection": [
{
"table": [
"tick_close"
],
"jdbcUrl": [
"jdbc:sqlserver://127.0.0.1:1234;DatabaseName=tick_close"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
Note: The data synchronization in this tutorial is only a full synchronization of historical data. For incremental data, two configuration parameters saveFunctionName and saveFunctionDef must be added in the writer configuration.
(3) Running data import
Go to the bin directory of dataX and execute the following command to import data to the table tick_close:
python datax.py ../conf/tick_close.json
Parameters:
- datax.py (required): the startup script of dataX.
- ../conf/tick_close.json (required): the path to configuration file.
Expected output:
2022-12-14 00:50:04.125 [job-0] INFO JobContainer -
Start Time : 2022-12-14 00:49:33
End Time : 2022-12-14 00:50:04
Elapsed Time : 30s
Average Flow : 555.88KB/s
Write Speed : 18139rec/s
Total Read Records : 544174
Total Failed Attempts : 0
3.4 Scheduling Factor Computing With Celery
This section introduces how to use Celery to schedule and asynchronously invoke factor computing.
3.4.1 Setting up Redis for message brokering and result backend
The Celery framework requires a message broker to send messages for task scheduling, and a result backend to store execution results. We recommend using Redis. The port number is 6379 in this example. You can customize your tools and configuration according to your needs.
3.4.2 Implementing factor computing
Connect to the DolphinDB cluster and invoke the predefined function. The factor WQAlpha1
is implemented as follows:
use wq101alpha
defg get_alpha1(security_id, begin_date, end_date){
if (typestr(security_id) == 'STRING VECTOR' && typestr(begin_date) == `DATE && typestr(end_date) == `DATE){
tick_list = select * from loadTable("dfs://tick_close", "tick_close") where TradeDate >= begin_date and TradeDate <= end_date and SecurityID in security_id
alpha1_list=WQAlpha1(panel(tick_list.TradeDate, tick_list.SecurityID, tick_list.Value))
return table(alpha1_list.rowNames() as TradeDate, alpha1_list)
}
else {
print("What you have entered is a wrong type")
return `NULLValue
}
}
Input parameters:
Parameter | Required | Description | DolphinDB Data Type |
---|---|---|---|
security_id | √ | stock IDs, splitted by date. | STRING VECTOR |
begin_date | √ | begin date | DATE |
end_date | √ | end date | DATE |
The function returns an in-memory table that contains the highest retracement ratio of a fund in a given time interval. If an exception occurs, a wrongNum value of STRING type is returned.
A Python program uses the Python API to connect to DolphinDB and invoke predefined functions. These functions are defined on the server in a separate session, so we create function views and grant the user with the view execution privileges to enable function calls from Python.
// add the function as a function view
addFunctionView(get_alpha1)
// grant a user xxx the execution privilege
grant("xxx", VIEW_EXEC, "get_alpha1")
3.4.3 Scheduling Tasks with Celery
Execute the following pip
command to install Celery:
pip install celery==4.3.0 && pip install redis==3.2.0
To avoid errors like TypeError: __init__() got an unexpected keyword argument 'username'
while using Celery, it is recommended to uninstall the default kombu library after installing Celery and install version 5.1.0 of the library.
After all required libraries are installed, build the project directory and files:
mkdir celery_project && touch celery_project/tasks.py celery_project/app.py
Execute tree ./celery_project
to check the directory tree:
./celery_project
├── app.py
└── tasks.py
0 directories, 2 files
The task.py file is used to construct
session
to DolphinDB, and declare the function execution as an asynchronous task to be scheduled.Import required Python packages:
from celery import Celery import dolphindb as ddb import numpy as np import pandas as pd from datetime import datetime
Construct a
session
object:s = ddb.session() s.connect("127.0.0.1", 8848, "admin", "123456")
Instantialize a Celery object and set up configurations:
app = Celery( 'celeryApp', broker='redis://localhost:6379/1', backend='redis://localhost:6379/2' ) app.conf.update( task_serializer='pickle', accept_content=['pickle'], result_serializer='pickle', timezone='Asia/Shanghai', enable_utc=True, )
The factor calculation involves passing and returning of datetime and DataFrame object. Since the default serialization method
json
does not support these data types, the parameterstask_serializer
,accept_content
, andresult_serializer
must be set topickle
.To declare the factor calculation as an asynchronous task, we encapsulate it in a function and decorate it with
@app.task()
:@app.task() def get_alpha1(security_id, begin_date, end_time): return s.run("get_alpha1", security_id, begin_date, end_time)
Note: Python data types are used here for data passing. You can refer to DolphinDB Python API Reference Guide for the mappings between Python and DolphinDB data types.
The file app.py uses a for-loop statement to call the function
delay()
of Celery to send two task requests and print each taskid
.import numpy as np from tasks import get_alpha1 security_id_list=[["600020", "600021"],["600022", "600023"]] if __name__ == '__main__': for i in security_id_list: result = get_alpha1.delay(i, np.datetime64('2020-01-01'), np.datetime64('2020-01-31')) print(result)
3.4.4 Processing tasks with Celery
Run the Celery worker server to process tasks:
celery -A tasks worker --loglevel=info
Expected output:
-------------- celery@cnserver9 v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-3.10.0-1160.53.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core 2022-11-11 00:10:34
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celeryApp:0x7f597a1d4e48
- ** ---------- .> transport: redis://localhost:6379/1
- ** ---------- .> results: redis://localhost:6379/2
- *** --- * --- .> concurrency: 64 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.max_drawdown
[2022-11-11 00:10:37,413: INFO/MainProcess] Connected to redis://localhost:6379/1
[2022-11-11 00:10:37,437: INFO/MainProcess] mingle: searching for neighbors
[2022-11-11 00:10:38,465: INFO/MainProcess] mingle: all alone
[2022-11-11 00:10:38,488: INFO/MainProcess] celery@cnserver9 ready.
After running the above command, worker will remain in an interactive mode. Then we need to establish a new session to connect to the machine.
Go to the Celery project directory and execute the following command to send asynchronous invoking requests:
python3 app.py
Expected output:
400a3024-65a1-4ba6-b8a9-66f6558be242
cd830360-e866-4850-aba0-3a07e8738f78
Check the worker status with the above command, and we can obtain the execution status of asynchronous tasks:
-------------- celery@cnserver9 v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-3.10.0-1160.53.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core 2022-11-11 00:10:34
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celeryApp:0x7f597a1d4e48
- ** ---------- .> transport: redis://localhost:6379/1
- ** ---------- .> results: redis://localhost:6379/2
- *** --- * --- .> concurrency: 64 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.max_drawdown
[2022-11-11 00:10:37,413: INFO/MainProcess] Connected to redis://localhost:6379/1
[2022-11-11 00:10:37,437: INFO/MainProcess] mingle: searching for neighbors
[2022-11-11 00:10:38,465: INFO/MainProcess] mingle: all alone
[2022-11-11 00:10:38,488: INFO/MainProcess] celery@cnserver9 ready.
[2022-11-11 00:12:44,365: INFO/MainProcess] Received task: tasks.max_drawdown[400a3024-65a1-4ba6-b8a9-66f6558be242]
[2022-11-11 00:12:44,369: INFO/MainProcess] Received task: tasks.max_drawdown[cd830360-e866-4850-aba0-3a07e8738f78]
[2022-11-11 00:12:44,846: INFO/ForkPoolWorker-63] Task tasks.get_alpha1[400a3024-65a1-4ba6-b8a9-66f6558be242] succeeded in 0.04292269051074982s: TradeDate 600020 600021
0 2020-01-01 NaN NaN
1 2020-01-02 NaN NaN
2 2020-01-03 NaN NaN
3 2020-01-06 NaN NaN
4 2020-01-07 0.5 0.0
5 2020-01-08 0.5 0.0
6 2020-01-09 0.0 0.5
7 2020-01-10 0.0 0.5
8 2020-01-13 0.0 0.5
9 2020-01-14 0.0 0.5
10 2020-01-15 0.5 0.0
11 2020-01-16 0.5 0.0
12 2020-01-17 0.5 0.0
13 2020-01-20 0.5 0.0
14 2020-01-21 0.0 0.5
15 2020-01-22 0.5 0.0
16 2020-01-23 0.5 0.0
17 2020-01-24 0.5 0.0
18 2020-01-27 0.5 0.0
19 2020-01-28 0.0 0.5
20 2020-01-29 0.0 0.5
21 2020-01-30 0.0 0.5
22 2020-01-31 0.0 0.5
[2022-11-11 00:12:45,054: INFO/ForkPoolWorker-1] Task tasks.get_alpha1[cd830360-e866-4850-aba0-3a07e8738f78] succeeded in 0.06510275602340698s: TradeDate 600022 600023
0 2020-01-01 NaN NaN
1 2020-01-02 NaN NaN
2 2020-01-03 NaN NaN
3 2020-01-06 NaN NaN
4 2020-01-07 0.0 0.0
5 2020-01-08 0.0 0.0
6 2020-01-09 0.0 0.0
7 2020-01-10 0.0 0.0
8 2020-01-13 0.0 0.0
9 2020-01-14 0.0 0.0
10 2020-01-15 0.0 0.5
11 2020-01-16 0.0 0.0
12 2020-01-17 0.0 0.5
13 2020-01-20 0.5 0.0
14 2020-01-21 0.5 0.0
15 2020-01-22 0.5 0.0
16 2020-01-23 0.5 0.0
17 2020-01-24 0.0 0.5
18 2020-01-27 0.0 0.0
19 2020-01-28 0.5 0.0
20 2020-01-29 0.5 0.0
21 2020-01-30 0.5 0.0
22 2020-01-31 0.5 0.0
After the tasks are executed, the execution results can also be retrieved from Redis.
Note: The asynchronous invoking requests can be sent before starting the worker, but the execution status or results cannot be retrieved and only a task id is returned.
4. Conclusion
This tutorial demonstrates how DolphinDB can be integrated into traditional factor calculation platforms to solve performance bottlenecks. Through testing, we combined DolphinDB's powerful computing and storage capabilities with the asynchronous task scheduling of Celery framework to provide a production-ready solution.