PartitionedTableAppender
The Java API provides the PartitionedTableAppender class which allows
data to be concurrently written from a Java client to a DolphinDB DFS table.
PartitionedTableAppender is used to construct a connection pool for
multi-threaded writing, then call the schema function to get the
partitioning information of the DFS table. The data to be written by the user is
classified based on the partitioning columns and handed over to different connections
for parallel writing.
Constructor
DolphinDB does not allow multiple writers to write data to the same partition simultaneously. Therefore, when performing multi-threaded parallel data writing on the client, it is necessary to ensure that each thread writes to a different partition. The Java API provides an easy method to automatically partition data and write in parallel, with the constructor as follows:
public PartitionedTableAppender(String dbUrl, String tableName, String partitionColName, String appendFunction, DBConnectionPool pool)
Parameters
- dbUrl: The DFS database path.
- tableName: The DFS database name.
- partitionColName: The partitioning column.
- appendFunction (optional): A String representing the user-defined writer
function. If not specified, the built-in
tableInsertfunction is called. - pool: The connection pool.
Examples
DBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, true, true);
PartitionedTableAppender appender = new PartitionedTableAppender(dbUrl, tableName, "sym", pool);
append Method
Call the method append to write a Table object to a
DFS table.
The number of inserted rows is returned.
Syntax
public int append(Table table) throws IOException
Parameters
-
table: The table containing the data to be inserted.
Usage Example
The following example writing 10 million records into a DFS table.
Note: This example uses a large data set. It is recommended to use a DolphinDB server configured with an 8-core CPU and 16 GB of memory to run this code.
DBConnection conn = new DBConnection();
conn.connect(HOST,PORT,USER,PASSWORD);
// Create DFS table
String script = "\n" +
"t = table(timestamp(1..10) as date,int(1..10) as sym,string(1..10) as str)\n" +
"db1=database(\"\",VALUE,date(now())+0..100)\n" +
"db2=database(\"\",RANGE,int(1..10))\n" +
"if(existsDatabase(\"dfs://demohash\")){\n" +
"\tdropDatabase(\"dfs://demohash\")\n" +
"}\n" +
"db =database(\"dfs://demohash\",COMPO,[db1,db2])\n" +
"pt = db.createPartitionedTable(t,`pt,`date`sym)\n";
conn.run(script);
// Create connection pool
ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, false, true);
PartitionedTableAppender appender = new PartitionedTableAppender(DBPATH, TABLE_NAME, "sym", pool);
List<String> colNames = new ArrayList<String>(3);
colNames.add("date");
colNames.add("sym");
colNames.add("str");
List<Vector> cols = new ArrayList<Vector>(3);
BasicTimestampVector date = new BasicTimestampVector(10000);
for (int i =0 ;i<10000;i++)
date.setTimestamp(i, LocalDateTime.now());
cols.add(date);
BasicIntVector sym = new BasicIntVector(10000);
for (int i =0 ;i<10000;i+=4) {
sym.setInt(i, 1);
sym.setInt(i + 1, 2);
sym.setInt(i + 2, 3);
sym.setInt(i + 3, 4);
}
cols.add(sym);
BasicStringVector str = new BasicStringVector(10000);
for (int i =0 ;i<10000;i++) {
str.setString(i,"32");
}
cols.add(str);
for (int i =0 ;i<1000;i++) {
int m = appender.append(new BasicTable(colNames, cols));
assertEquals(10000,m);
}
BasicLong re = (BasicLong) conn.run("pt= loadTable(\"dfs://demohash\",`pt)\n" +
"exec count(*) from pt");
System.out.println(re);
pool.shutdown();
Execution result:
10000000
The parameter appendFunction can be specified to use a custom function for
writing data. In the example below, the function view myAppend is
used to write the data twice.
DBConnection conn = new DBConnection();
conn.connect(HOST,PORT,USER,PASSWORD);
// Create a partitioned table
String script1 = "\n" +
"db = database(\"dfs://ptaDemo\",VALUE, `A`B`C)\n" +
"t1 = table(`A`B`C as sym, 1..3\\10 as price)\n" +
"pt = db.createPartitionedTable(t1,`pt, `sym)\n";
conn.run(script1);
// Define a function view
String script2 = "\n" +
"def myAppend(t){\n" +
"pt = loadTable(\"dfs://ptaDemo\",\"pt\")\n" +
"pt.append!(t)\n" +
"pt.append!(t)\n" +
"}\n" +
"addFunctionView(myAppend)";
conn.run(script2);
String dbUrl = "dfs://ptaDemo";
String tableName = "pt";
String partitionCol = "sym";
String appendFunction = "myAppend"; // Specify a custom append function
DBConnectionPool pool = new ExclusiveDBConnectionPool(
HOST, PORT, USER, PASSWORD, 3, true, true
);
// Pass in appendFunction; myAppend will be called for writing instead of the default tableInsert
PartitionedTableAppender appender = new PartitionedTableAppender(
dbUrl, tableName, partitionCol, appendFunction, pool
);
// On the Java side, only prepare sym and price, without loadTime
List<String> colNames = Arrays.asList("sym", "price");
List<Vector> cols = new ArrayList<>();
BasicStringVector sym = new BasicStringVector(3);
sym.setString(0, "A");
sym.setString(1, "B");
sym.setString(2, "A");
BasicDoubleVector price = new BasicDoubleVector(3);
price.setDouble(0, 10.5);
price.setDouble(1, 20.8);
price.setDouble(2, 11.2);
cols.add(sym);
cols.add(price);
int rows = appender.append(new BasicTable(colNames, cols));
BasicLong re = (BasicLong) conn.run("pt= loadTable(\"dfs://ptaDemo\",`pt)\n" +
"exec count(*) from pt");
System.out.println(re);
pool.shutdown();
The output is 6.
