PartitionedTableAppender
While using tableInsert
to perform concurrent writes to DFS tables can
be a more complex implementation, the C++ API provides the
PartitionedTableAppender
class (PTA) to simplify the process. This
class enables automatic concurrent writes by partition.
This section will guide you through the usage of the
PartitionedTableAppender
class with detailed examples.
Constructing a PTA
The core idea behind the PTA is to establish a connection pool. The PTA retrieves the partition information of the given DFS table and allocates each partition to a connection within the pool for parallel writing, ensuring that a partition is written to by only one connection at the same time.
The constructor declaration is as follows:
PartitionedTableAppender(string dbUrl, string tableName, string partitionColName, DBConnectionPool& pool);
Arguments
- dbUrl: The path of a DFS database folder.
- tableName: The table name.
- partitionColName: The partitioning column.
- DBConnectionPool: The connection pool.
append
You can use the append
function to append a table to the PTA object.
It will not return until the PTA has finished writing all the data.
The function declaration is as follows:
int append(TableSP table);
Arguments
- table: The table to be appended.
Return Values
The number of rows that have been appended.
Examples
(1) Create a DFS table with the following script:
dbPath = "dfs://SAMPLE_TRDDB";
tableName = `demoTable
if(existsDatabase(dbPath)){
dropDatabase(dbPath)
}
db = database(dbPath, VALUE, 2010.01.01..2010.01.30)
pt=db.createPartitionedTable(table(1000000:0, `name`date`price, [STRING,DATE,DOUBLE]), tableName, `date)
(2) Use PTA to append a table to the DFS table “demoTable“ in parallel.
TableSP createDemoTable(){
vector<string> colNames = {"name", "date","price"};
vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
int colNum = 3, rowNum = 10, indexCapacity = 10;
ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
vector<VectorSP> columnVecs;
for(int i = 0; i < colNum; ++i)
columnVecs.push_back(table->getColumn(i));
for(int i = 0; i < rowNum; ++i){
columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i)));
columnVecs[1]->set(i, Util::createDate(2010, 1, i+1));
columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0));
}
return table;
}
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
DBConnectionPool pool("127.0.0.1", 8848, 20, "admin", "123456");
PartitionedTableAppender appender("dfs://SAMPLE_TRDDB", "demoTable", "date", pool);
TableSP table = createDemoTable();
appender.append(table);
ConstantSP result = conn.run("select * from loadTable('dfs://SAMPLE_TRDDB', `demoTable)");
std::cout << result->getString() << std::cout;
}