PartitionedTableAppender

The Java API provides the PartitionedTableAppender class which allows data to be concurrently written from a Java client to a DolphinDB DFS table.

PartitionedTableAppender is used to construct a connection pool for multi-threaded writing, then call the schema function to get the partitioning information of the DFS table. The data to be written by the user is classified based on the partitioning columns and handed over to different connections for parallel writing.

Constructor

DolphinDB does not allow multiple writers to write data to the same partition simultaneously. Therefore, when performing multi-threaded parallel data writing on the client, it is necessary to ensure that each thread writes to a different partition. The Java API provides an easy method to automatically partition data and write in parallel, with the constructor as follows:

public PartitionedTableAppender(String dbUrl, String tableName, String partitionColName, String appendFunction, DBConnectionPool pool)

Parameters

  • dbUrl: The DFS database path.
  • tableName: The DFS database name.
  • partitionColName: The partitioning column.
  • appendFunction (optional): A String representing the user-defined writer function. If not specified, the built-in tableInsert function is called.
  • pool: The connection pool.

Examples

DBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, true, true);
PartitionedTableAppender appender = new PartitionedTableAppender(dbUrl, tableName, "sym", pool);

Usage Example

The following example writing 10 million records into a DFS table.

Note: This example uses a large data set. It is recommended to use a DolphinDB server configured with an 8-core CPU and 16 GB of memory to run this code.

DBConnection conn = new DBConnection();
conn.connect(HOST,PORT,USER,PASSWORD);

// Create DFS table
String script = "\n" +
        "t = table(timestamp(1..10)  as date,int(1..10) as sym,string(1..10) as str)\n" +
        "db1=database(\"\",VALUE,date(now())+0..100)\n" +
        "db2=database(\"\",RANGE,int(1..10))\n" +
        "if(existsDatabase(\"dfs://demohash\")){\n" +
        "\tdropDatabase(\"dfs://demohash\")\n" +
        "}\n" +
        "db =database(\"dfs://demohash\",COMPO,[db1,db2])\n" +
        "pt = db.createPartitionedTable(t,`pt,`date`sym)\n";
conn.run(script);

// Create connection pool
ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, false, true);
PartitionedTableAppender appender = new PartitionedTableAppender(DBPATH, TABLE_NAME, "sym", pool);
List<String> colNames = new ArrayList<String>(3);
colNames.add("date");
colNames.add("sym");
colNames.add("str");
List<Vector> cols = new ArrayList<Vector>(3);
BasicTimestampVector date = new BasicTimestampVector(10000);
for (int i =0 ;i<10000;i++)
    date.setTimestamp(i, LocalDateTime.now());
cols.add(date);
BasicIntVector sym = new BasicIntVector(10000);
for (int i =0 ;i<10000;i+=4) {
    sym.setInt(i, 1);
    sym.setInt(i + 1, 2);
    sym.setInt(i + 2, 3);
    sym.setInt(i + 3, 4);
}
cols.add(sym);
BasicStringVector str = new BasicStringVector(10000);
for (int i =0 ;i<10000;i++) {
    str.setString(i,"32");
}
cols.add(str);
for (int i =0 ;i<1000;i++) {
    int m = appender.append(new BasicTable(colNames, cols));
    assertEquals(10000,m);
}
BasicLong re = (BasicLong) conn.run("pt= loadTable(\"dfs://demohash\",`pt)\n" +
        "exec count(*) from pt");
System.out.println(re);
pool.shutdown();

Execution result:

10000000