Database

This section describes how to create databases and tables in the databases using native methods provided by the DolphinDB Python API.

Session.database

The Database class wraps the DolphinDB database object handle, providing an interface to interact with the database in Python. The Database object is usually constructed using the Session.database method:

Session.database(dbName=None, prititionType=None, parititions=None, dbPath=None, engine=None, atomic=None, chunkGranularity=None)
  • dbName: optional. Name of the database handle.
  • partitionType: partition type. It can be keys.SEQ, keys.VALUE, keys.RANGE, keys.LIST, keys.COMPO, and keys.HASH.
  • partitions: the partition scheme, which describes how the partitions are created. It is usually a list or np.ndarray.
  • dbPath: the path to save the database
  • engine: the storage engine
  • atomic: at which level the atomicity is guaranteed for a write transaction.
  • chunkGranularity: the chunk granularity. The value can be ‘TABLE’ or ‘DATABASE’:

More detailed description of some of these parameters can be found in the DolphinDB User Manual - database.

dbName

When loading an existing database or creating a database, you can specify dbName to indicate the handle name after the database is loaded into memory. If this parameter is not specified, a random string will be automatically generated as the handle name, which can be obtained through the _getDbName() method.

Example 1. Creating database without dbName

dbPath = "dfs://dbName"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath)

dbName = db._getDbName()
print(dbName)
print(s.run(dbName))

Output:

TMP_DB_15c2bf85DB
DB[dfs://dbName]

Example 2. Creating database with dbName

dbPath = "dfs://dbName"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(dbName="testDB", partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath)

dbName = db._getDbName()
print(dbName)
print(s.run(dbName))

Output:

testDB
DB[dfs://dbName]

dbPath, partitionType and partitions

When creating database with Session.database, partitionType and partitions must be specified. When creating a DFS database, dbPath must be specified; otherwise, it is not required.

Creating DFS Databases of Different Partition Types

Environment setup:

import dolphindb as ddb
import dolphindb.settings as keys
import numpy as np
import pandas as pd

s = ddb.Session()
s.connect("localhost", 8848, "admin", "123456")

Database with VALUE partitions

Partition on date values:

dbPath="dfs://db_value_date"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
dates=np.array(pd.date_range(start='20120101', end='20120110'), dtype="datetime64[D]")
db = s.database(dbName='mydb', partitionType=keys.VALUE, partitions=dates,dbPath=dbPath)

Partition on month values:

dbPath="dfs://db_value_month"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
months=np.array(pd.date_range(start='2012-01', end='2012-10', freq="M"), dtype="datetime64[M]")
db = s.database(partitionType=keys.VALUE, partitions=months,dbPath=dbPath)

Database with RANGE partitions

Partition on INT values:

dbPath="dfs://db_range_int"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.RANGE, partitions=[1, 11, 21], dbPath=dbPath)

Database with LIST partitions

Partition on SYMBOL values:

dbPath="dfs://db_list_sym"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.LIST, partitions=[['IBM', 'ORCL', 'MSFT'], ['GOOG', 'FB']],dbPath=dbPath)

Database with LIST partitions

Partition on INT values:

dbPath="dfs://db_hash_int"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.HASH, partitions=[keys.DT_INT, 3], dbPath=dbPath)

Database with COMPO partitions

The following script creates a DFS database with composite partitions. The first level of partitioning type is VALUE (based on date values) and the second level is RANGE (based on INT values).

Note: When creating the sub-databases that comprise the COMPO database, dbPath must be an empty string or unspecified.

db1 = s.database(partitionType=keys.VALUE, partitions=np.array(["2012-01-01", "2012-01-06"], dtype="datetime64[D]"))
db2 = s.database(partitionType=keys.RANGE, partitions=[1, 6, 11])
dbPath="dfs://db_compo_test"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.COMPO, partitions=[db1, db2], dbPath=dbPath)

engine

By default, database creates OLAP databases. You can specify engine to create databases with other types of storage engines.

Creating a TSDB database

To create a TSDB database, specify engine = "TSDB". Additionally, when creating tables in this database using createTable or createPartitionedTable, specify sortColumns.

dates = np.array(pd.date_range(start='20120101', end='20120110'), dtype="datetime64[D]")
dbPath = "dfs://tsdb"
if s.existsDatabase(dbPath): 
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=dates, dbPath=dbPath, engine="TSDB")

Creating a PKEY database

Creating a database with the PKEY engine (primary key engine) is similar to creating an OLAP database. The main difference is that you need to specify engine = "PKEY" when calling database. Additionally, when calling createTable or createPartitionedTable, you'll need to use the primaryKey parameter to specify the primary key column(s) and the indexes parameter to define indexing methods for specific columns.

dates = np.array(pd.date_range(start='20120101', end='20120110'), dtype="datetime64[D]")
dbPath = "dfs://pkey"
if s.existsDatabase(dbPath): 
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=dates, dbPath=dbPath, engine="PKEY")

atomic

atomic indicates at which level the atomicity is guaranteed for a write transaction, thus determining whether concurrent writes to the same chunk are allowed. It can be ‘TRANS’ (default) or ‘CHUNK’.

  • ‘TRANS’ indicates that the atomicity is guaranteed at the transaction level. If a transaction attempts to write to multiple chunks and one of the chunks is locked by another transaction, a write-write conflict occurs, and all writes of the transaction fail. Therefore, setting atomic =’TRANS’ means concurrent writes to a chunk are not allowed.
  • ‘CHUNK’ indicates that the atomicity is guaranteed at the chunk level. If a transaction tries to write to multiple chunks and a write-write conflict occurs as a chunk is locked by another transaction, instead of aborting the writes, the transaction will keep writing to the non-locked chunks and keep attempting to write to the chunk in conflict until it is still locked after a few minutes. Therefore, setting atomic =’CHUNK’ means concurrent writes to a chunk are allowed. As the atomicity at the transaction level is not guaranteed, the write operation may succeed in some chunks but fail in other chunks. Please also note that the write speed may be impacted by the repeated attempts to write to the chunks that are locked.

chunkGranularity

chunkGranularity determines whether concurrent writes to different tables in the same chunk are allowed. The value can be "TABLE" or "DATABASE":

  • "TABLE": the chunk granularity is at the TABLE level. In this case, concurrent writes to different tables in the same partition are allowed.
  • "DATABASE": the chunk granularity is at the DATABASE level. In this case, concurrent writes to different tables in the same partition are not allowed.

Note: This parameter is only enabled when the server configuration parameter enableChunkGranularityConfig is set to true.

createTable

createTable creates an empty dimension table.

Database.createTable(table, tableName, sortColumns=None)
  • table: a Table object. An empty dimension table will be created based on its schema.
  • tableName: str. The name of the dimension table.
  • sortColumns: str or str list, optional. The columns to sort the ingested data within each partition. If multiple columns are specified for sortColumns, the last column must be a time column. The preceding columns are used as the sort keys.
  • compressMethods: dict,optional. A dictionary indicating which compression methods are used for specified columns. The keys are columns name and the values are compression methods. If unspecified, use LZ4 compression method.
  • primaryKey: str or str list, optional. Uniquely identifies each record in a DFS table of the PKEY database. For records with the same primary key, only the latest one is retained.
  • keepDuplicates: optional. Specifies how to deal with records with duplicate sortColumns values. It can have the following values:
    • ALL: keep all records;
    • LAST: only keep the last record;
    • FIRST: only keep the first record.
  • softDelete: optional. Determines whether to enable soft delete for TSDB databases. The default value is false.
  • indexes: dict, optional. A dictionary with columns as keys and index types as values. Can only be set for tables of TSDB or PKEY databases.

createTable simply calls the underlying DolphinDB createDimensionTable function. Please check the DolphinDB User Manual for a full description of the createDimensionTable capabilities.

Example 1

The following example creates a dimension table "dt" which uses the schema of the "schema_t" table and sorts data based on the "csymbol" column.

dbPath = "dfs://createTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="TSDB")
s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
schema_t = s.table(data="schema_t")
dt = db.createTable(schema_t, "dt", ["csymbol"])
schema = s.run(f'schema(loadTable("{dbPath}", "dt"))')
print(schema["colDefs"])

Output:

       name typeString  typeInt  extra comment
0    ctime  TIMESTAMP       12    NaN        
1  csymbol     SYMBOL       17    NaN        
2    price     DOUBLE       16    NaN        
3      qty        INT        4    NaN   

Example 2

The following example creates a dimension table pt based on "schema_t" in a PKEY database. The "csymbol" column is specified as the primary key. The indexes parameter is not specified here, so by default, only the primary key column "csymbol" uses a bloomfilter indexing, while all other columns use zonemap indexing.

dbPath = "dfs://createTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="PKEY")
s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
schema_t = s.table(data="schema_t")
pt = db.createTable(schema_t, "pt", primaryKey=["csymbol"])
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["primaryKey"])
print(schema["indexes"])

Output:

['csymbol']
                       name         type columnName
0             ctime_zonemap      zonemap      ctime
1  _primary_key_bloomfilter  bloomfilter    csymbol
2           csymbol_zonemap      zonemap    csymbol
3               qty_zonemap      zonemap        qty

Example 3

The following example creates a dimension table pt based on "schema_t" in a PKEY database. The "csymbol" and "qty" columns are specified as the primary key columns. The composite key and the "qty" column use bloomfilter indexing, while all other columns use zonemap indexing.

dbPath = "dfs://createTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="PKEY")
s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
schema_t = s.table(data="schema_t")
pt = db.createTable(schema_t, "pt", primaryKey=["csymbol", "qty"], indexes={"qty": ["bloomfilter"]})
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["primaryKey"])
print(schema["indexes"])

Output:

['csymbol' 'qty']
                       name         type              columnName
0  _primary_key_bloomfilter  bloomfilter  _composite_primary_key
1             ctime_zonemap      zonemap                   ctime
2           csymbol_zonemap      zonemap                 csymbol
3               qty_zonemap      zonemap                     qty
4           qty_bloomfilter  bloomfilter                     qty

createPartitionedTable

Create an empty partitioned table in a DFS database.

Database.createPartitionedTable(
    table, tableName, partitionColumns, compressMethods={}, sortColumns=None,
    keepDuplicates=None, sortKeyMappingFunction=None
)
  • table: a Table object. An empty partitioned table will be created based on its schema.
  • tableName: str. The name of the partitioned table.
  • partitionColumns: str or str list. The partitioning column(s).
  • compressMethods: dict, optional. Indicate which compression methods are used for specified columns. The keys are columns name and the values are compression methods.
  • sortColumns: str or str list, optional. The columns to sort the ingested data within each partition. If multiple columns are specified for sortColumns, the last column must be a time column. The preceding columns are used as the sort keys.
  • keepDuplicates: optional. specifies how to deal with records with duplicate sortColumns values. It can have the following values:
    • ALL: keep all records;
    • LAST: only keep the last record;
    • FIRST: only keep the first record.
  • sortKeyMappingFunction: list of server function names, optional. It has the same length as the number of sort keys. The specified mapping functions are applied to each sort key (i.e., the sort columns except for the temporal column) for dimensionality reduction.
  • primaryKey: str or str list, optional. Uniquely identifies each record in a DFS table of the PKEY database. For records with the same primary key, only the latest one is retained.
  • softDelete: optional. Determines whether to enable soft delete for TSDB databases. The default value is false.
  • indexes: dict, optional. A dictionary with columns as keys and index types as values. Can only be set for tables of TSDB or PKEY databases.

createPartitionedTable simply calls the underlying DolphinDB createPartitionedTable function. Please check the DolphinDB User Manual for a full description of the createPartitionedTable capabilities.

Example 1

This example creates a partitioned table with partitioning column "TradeDate" and sort columns ["SecurityID", "TradeDate"] in a TSDB database. The schema of the partitioned table is extracted from the "schema_t" table. It specifies the sorting columns as SecurityID and TradeDate, where the sorting function of SecurityID uses hashBucket {, 5}, and the handling strategy when the sorting column values are the same is "ALL".

The following example creates a partitioned table pt in a TSDB database. pt uses the schema of the "schema_t" table with partitioning column "TradeDate" and sort columns "SecurityID" and "TradeDate". The function hashBucket{,5} is applied to the "TradeDate" column for dimensionality reduction. With duplicate sort columns values, it keeps all records.

dbPath = "dfs://createPartitionedTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
dates = np.array(pd.date_range(start='20220101', end='20220105'), dtype="datetime64[D]")
db = s.database(partitionType=keys.VALUE, partitions=dates, dbPath=dbPath, engine="TSDB")
s.run("schema_t = table(100:0, `SecurityID`TradeDate`TotalVolumeTrade`TotalValueTrade, [SYMBOL, DATE, INT, DOUBLE])")
schema_t = s.table(data="schema_t")
pt = db.createPartitionedTable(schema_t, "pt", partitionColumns="TradeDate", sortColumns=["SecurityID", "TradeDate"], keepDuplicates="ALL", sortKeyMappingFunction=["hashBucket{,5}"])
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["colDefs"])

Output:

                name typeString  typeInt  extra comment
0        SecurityID     SYMBOL       17    NaN        
1         TradeDate       DATE        6    NaN        
2  TotalVolumeTrade        INT        4    NaN        
3   TotalValueTrade     DOUBLE       16    NaN

Example 2

The following example creates a partitioned table pt in an OLAP database. pt uses the schema of the "schema_t" table. The partitioning column is the "symbol" column and the "timestamp" column is compressed using the "delta" method.

dbPath = "dfs://createPartitionedTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=["IBM", "MS"], dbPath=dbPath)
s.run("schema_t = table(100:0, `timestamp`symbol`value, [TIMESTAMP, SYMBOL, DOUBLE])")
schema_t = s.table(data="schema_t")
pt = db.createPartitionedTable(schema_t, "pt", partitionColumns="symbol", compressMethods={'timestamp': "delta"})
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["colDefs"])

Output:

        name typeString  typeInt  extra comment
0  timestamp  TIMESTAMP       12    NaN        
1     symbol     SYMBOL       17    NaN        
2      value     DOUBLE       16    NaN  

Example 3

The following example creates a partitioned DFS table pt based on "schema_t" in a PKEY database. The "SecurityID" and "TradeDate" columns are specified as the primary key columns. The partitioning column is "TradeDate". The primary key column "SecurityID" uses bloomfilter indexing.

dbPath = "dfs://createPartitionedTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
dates = np.array(pd.date_range(start='20220101', end='20220105'), dtype="datetime64[D]")
db = s.database(partitionType=keys.VALUE, partitions=dates, dbPath=dbPath, engine="PKEY")
s.run("schema_t = table(100:0, `SecurityID`TradeDate`TotalVolumeTrade`TotalValueTrade, [SYMBOL, DATE, INT, DOUBLE])")
schema_t = s.table(data="schema_t")
pt = db.createPartitionedTable(schema_t, "pt", partitionColumns="TradeDate", primaryKey=["SecurityID", "TradeDate"], indexes={'SecurityID': ["bloomfilter"]})
schema = s.run(f'schema(loadTable("{dbPath}", "pt"))')
print(schema["primaryKey"])
print(schema["indexes"])

Output:

['SecurityID' 'TradeDate']
                       name         type              columnName
0  _primary_key_bloomfilter  bloomfilter  _composite_primary_key
1        SecurityID_zonemap      zonemap              SecurityID
2    SecurityID_bloomfilter  bloomfilter              SecurityID
3  TotalVolumeTrade_zonemap      zonemap        TotalVolumeTrade