PartitionedTableAppender

The Java API provides the PartitionedTableAppender class which allows data to be concurrently written from a Java client to a DolphinDB DFS table.

PartitionedTableAppender is used to construct a connection pool for multi-threaded writing, then call the schema function to get the partitioning information of the DFS table. The data to be written by the user is classified based on the partitioning columns and handed over to different connections for parallel writing.

Constructor

DolphinDB does not allow multiple writers to write data to the same partition simultaneously. Therefore, when performing multi-threaded parallel data writing on the client, it is necessary to ensure that each thread writes to a different partition. The Java API provides an easy method to automatically partition data and write in parallel, with the constructor as follows:

public PartitionedTableAppender(String dbUrl, String tableName, String partitionColName, String appendFunction, DBConnectionPool pool)

Parameters

  • dbUrl: The DFS database path.
  • tableName: The DFS database name.
  • partitionColName: The partitioning column.
  • appendFunction (optional): A String representing the user-defined writer function. If not specified, the built-in tableInsert function is called.
  • pool: The connection pool.

Examples

DBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, true, true);
PartitionedTableAppender appender = new PartitionedTableAppender(dbUrl, tableName, "sym", pool);

append Method

Call the method append to write a Table object to a DFS table.

The number of inserted rows is returned.

Syntax

public int append(Table table) throws IOException

Parameters

  • table: The table containing the data to be inserted.

Usage Example

The following example writing 10 million records into a DFS table.

Note: This example uses a large data set. It is recommended to use a DolphinDB server configured with an 8-core CPU and 16 GB of memory to run this code.

DBConnection conn = new DBConnection();
conn.connect(HOST,PORT,USER,PASSWORD);

// Create DFS table
String script = "\n" +
        "t = table(timestamp(1..10)  as date,int(1..10) as sym,string(1..10) as str)\n" +
        "db1=database(\"\",VALUE,date(now())+0..100)\n" +
        "db2=database(\"\",RANGE,int(1..10))\n" +
        "if(existsDatabase(\"dfs://demohash\")){\n" +
        "\tdropDatabase(\"dfs://demohash\")\n" +
        "}\n" +
        "db =database(\"dfs://demohash\",COMPO,[db1,db2])\n" +
        "pt = db.createPartitionedTable(t,`pt,`date`sym)\n";
conn.run(script);

// Create connection pool
ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, false, true);
PartitionedTableAppender appender = new PartitionedTableAppender(DBPATH, TABLE_NAME, "sym", pool);
List<String> colNames = new ArrayList<String>(3);
colNames.add("date");
colNames.add("sym");
colNames.add("str");
List<Vector> cols = new ArrayList<Vector>(3);
BasicTimestampVector date = new BasicTimestampVector(10000);
for (int i =0 ;i<10000;i++)
    date.setTimestamp(i, LocalDateTime.now());
cols.add(date);
BasicIntVector sym = new BasicIntVector(10000);
for (int i =0 ;i<10000;i+=4) {
    sym.setInt(i, 1);
    sym.setInt(i + 1, 2);
    sym.setInt(i + 2, 3);
    sym.setInt(i + 3, 4);
}
cols.add(sym);
BasicStringVector str = new BasicStringVector(10000);
for (int i =0 ;i<10000;i++) {
    str.setString(i,"32");
}
cols.add(str);
for (int i =0 ;i<1000;i++) {
    int m = appender.append(new BasicTable(colNames, cols));
    assertEquals(10000,m);
}
BasicLong re = (BasicLong) conn.run("pt= loadTable(\"dfs://demohash\",`pt)\n" +
        "exec count(*) from pt");
System.out.println(re);
pool.shutdown();

Execution result:

10000000

The parameter appendFunction can be specified to use a custom function for writing data. In the example below, the function view myAppend is used to write the data twice.

DBConnection conn = new DBConnection();
conn.connect(HOST,PORT,USER,PASSWORD);

// Create a partitioned table
String script1 = "\n" +
        "db = database(\"dfs://ptaDemo\",VALUE, `A`B`C)\n" +
        "t1 = table(`A`B`C as sym, 1..3\\10 as price)\n" +
        "pt = db.createPartitionedTable(t1,`pt, `sym)\n";
conn.run(script1);

// Define a function view
String script2 = "\n" +
        "def myAppend(t){\n" +
        "pt = loadTable(\"dfs://ptaDemo\",\"pt\")\n" +
        "pt.append!(t)\n" +
        "pt.append!(t)\n" +
        "}\n" +
        "addFunctionView(myAppend)";
conn.run(script2);

String dbUrl = "dfs://ptaDemo";
String tableName = "pt";
String partitionCol = "sym";
String appendFunction = "myAppend";  // Specify a custom append function

DBConnectionPool pool = new ExclusiveDBConnectionPool(
        HOST, PORT, USER, PASSWORD, 3, true, true
);

// Pass in appendFunction; myAppend will be called for writing instead of the default tableInsert
PartitionedTableAppender appender = new PartitionedTableAppender(
        dbUrl, tableName, partitionCol, appendFunction, pool
);

// On the Java side, only prepare sym and price, without loadTime
List<String> colNames = Arrays.asList("sym", "price");
List<Vector> cols = new ArrayList<>();

BasicStringVector sym = new BasicStringVector(3);
sym.setString(0, "A");
sym.setString(1, "B");
sym.setString(2, "A");

BasicDoubleVector price = new BasicDoubleVector(3);
price.setDouble(0, 10.5);
price.setDouble(1, 20.8);
price.setDouble(2, 11.2);

cols.add(sym);
cols.add(price);

int rows = appender.append(new BasicTable(colNames, cols));
BasicLong re = (BasicLong) conn.run("pt= loadTable(\"dfs://ptaDemo\",`pt)\n" +
        "exec count(*) from pt");
System.out.println(re);
pool.shutdown();

The output is 6.