PartitionedTableAppender
The Java API provides the PartitionedTableAppender
class which allows
data to be concurrently written from a Java client to a DolphinDB DFS table.
PartitionedTableAppender
is used to construct a connection pool for
multi-threaded writing, then call the schema
function to get the
partitioning information of the DFS table. The data to be written by the user is
classified based on the partitioning columns and handed over to different connections
for parallel writing.
Constructor
DolphinDB does not allow multiple writers to write data to the same partition simultaneously. Therefore, when performing multi-threaded parallel data writing on the client, it is necessary to ensure that each thread writes to a different partition. The Java API provides an easy method to automatically partition data and write in parallel, with the constructor as follows:
public PartitionedTableAppender(String dbUrl, String tableName, String partitionColName, String appendFunction, DBConnectionPool pool)
Parameters
- dbUrl: The DFS database path.
- tableName: The DFS database name.
- partitionColName: The partitioning column.
- appendFunction (optional): A String representing the user-defined writer
function. If not specified, the built-in
tableInsert
function is called. - pool: The connection pool.
Examples
DBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, true, true); PartitionedTableAppender appender = new PartitionedTableAppender(dbUrl, tableName, "sym", pool);
Usage Example
The following example writing 10 million records into a DFS table.
Note: This example uses a large data set. It is recommended to use a DolphinDB server configured with an 8-core CPU and 16 GB of memory to run this code.
DBConnection conn = new DBConnection(); conn.connect(HOST,PORT,USER,PASSWORD); // Create DFS table String script = "\n" + "t = table(timestamp(1..10) as date,int(1..10) as sym,string(1..10) as str)\n" + "db1=database(\"\",VALUE,date(now())+0..100)\n" + "db2=database(\"\",RANGE,int(1..10))\n" + "if(existsDatabase(\"dfs://demohash\")){\n" + "\tdropDatabase(\"dfs://demohash\")\n" + "}\n" + "db =database(\"dfs://demohash\",COMPO,[db1,db2])\n" + "pt = db.createPartitionedTable(t,`pt,`date`sym)\n"; conn.run(script); // Create connection pool ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, false, true); PartitionedTableAppender appender = new PartitionedTableAppender(DBPATH, TABLE_NAME, "sym", pool); List<String> colNames = new ArrayList<String>(3); colNames.add("date"); colNames.add("sym"); colNames.add("str"); List<Vector> cols = new ArrayList<Vector>(3); BasicTimestampVector date = new BasicTimestampVector(10000); for (int i =0 ;i<10000;i++) date.setTimestamp(i, LocalDateTime.now()); cols.add(date); BasicIntVector sym = new BasicIntVector(10000); for (int i =0 ;i<10000;i+=4) { sym.setInt(i, 1); sym.setInt(i + 1, 2); sym.setInt(i + 2, 3); sym.setInt(i + 3, 4); } cols.add(sym); BasicStringVector str = new BasicStringVector(10000); for (int i =0 ;i<10000;i++) { str.setString(i,"32"); } cols.add(str); for (int i =0 ;i<1000;i++) { int m = appender.append(new BasicTable(colNames, cols)); assertEquals(10000,m); } BasicLong re = (BasicLong) conn.run("pt= loadTable(\"dfs://demohash\",`pt)\n" + "exec count(*) from pt"); System.out.println(re); pool.shutdown();
Execution result:
10000000