PartitionedTableAppender
Similar to the TableAppender class, when you use a PartitionedTableAppender to append data to a DolphinDB table, time values are automatically converted to match the target schema. The PartitionedTableAppender class accepts a connection pool as an argument, and assigns each connection to concurrently append to a partition of the target table by calling the append()
method. At any given time, only a single connection can write to a specific partition.
Methods
The following script creates a PartitionedTableAppender object with default parameter values:
PartitionedTableAppender(dbPath=None, tableName=None, partitionColName=None, dbConnectionPool=None)
- dbPath: The address to a DFS database. If the partitioned table is an in-memory table, you can leave this parameter unspecified.
- tableName: Name of the partitioned table to append data to.
- partitionColName: str. The partitioning column of the target table. Note: If the partitioned table has multiple partitioning columns, any one of the partitioning columns can be used. The specified partitioning column is used to determine connection allocation across partitions.
- dbConnectionPool: DBConnectionPool. A DBConnectionPool object.
PartitionedTableAppender has only one method, append
:
append(table)
- table: the data to be appended to the DolphinDB table. It is usually a local pandas.DataFrame.
Example
In this script, we create a DFS database named dfs://valuedb and construct a partitioned table pt within it. Then instantiate a DBConnectionPool, pool, with 3 connections. Passing this connection pool to the PartitionedTableAppender enables us to concurrently append the locally generated DataFrame across the partitions of pt
. Finally, print the result of the append()
call.
import dolphindb as ddb
import pandas as pd
import numpy as np
import random
s = ddb.Session()
s.connect("localhost", 8848, "admin", "123456")
script = """
dbPath = "dfs://valuedb"
if(existsDatabase(dbPath)){
dropDatabase(dbPath)
}
t = table(100:0, `id`date`vol, [SYMBOL, DATE, LONG])
db = database(dbPath, VALUE, `APPL`IBM`AMZN)
pt = db.createPartitionedTable(t, `pt, `id)
"""
s.run(script)
pool = ddb.DBConnectionPool("localhost", 8848, 3, "admin", "123456")
appender = ddb.PartitionedTableAppender(dbPath="dfs://valuedb", tableName="pt", partitionColName="id", dbConnectionPool=pool)
n = 100
dates = []
for i in range(n):
dates.append(np.datetime64(
"201{:d}-0{:1d}-{:2d}".format(random.randint(0, 9), random.randint(1, 9), random.randint(10, 28))))
data = pd.DataFrame({
"id": np.random.choice(['AMZN', 'IBM', 'APPL'], n),
"time": dates,
"vol": np.random.randint(100, size=n)
})
re = appender.append(data)
print(re)
print(s.run("pt = loadTable('dfs://valuedb', 'pt'); select * from pt;"))
Output:
100
id date vol
0 AMZN 2017-01-22 60
1 AMZN 2014-08-12 37
2 AMZN 2012-09-10 68
3 AMZN 2012-03-14 48
4 AMZN 2016-07-12 1
.. ... ... ...
95 IBM 2016-05-15 25
96 IBM 2012-06-19 6
97 IBM 2010-05-10 96
98 IBM 2017-07-10 32
99 IBM 2012-09-23 68
[100 rows x 3 columns]