PartitionedTableAppender

While using tableInsert to perform concurrent writes to DFS tables can be a more complex implementation, the C++ API provides the PartitionedTableAppender class (PTA) to simplify the process. This class enables automatic concurrent writes by partition.

This section will guide you through the usage of the PartitionedTableAppender class with detailed examples.

Constructing a PTA

The core idea behind the PTA is to establish a connection pool. The PTA retrieves the partition information of the given DFS table and allocates each partition to a connection within the pool for parallel writing, ensuring that a partition is written to by only one connection at the same time.

The constructor declaration is as follows:

PartitionedTableAppender(string dbUrl, string tableName, string partitionColName, DBConnectionPool& pool);

Arguments

  • dbUrl: The path of a DFS database folder.
  • tableName: The table name.
  • partitionColName: The partitioning column.
  • DBConnectionPool: The connection pool.

append

You can use the append function to append a table to the PTA object. It will not return until the PTA has finished writing all the data.

The function declaration is as follows:

int append(TableSP table);

Arguments

  • table: The table to be appended.

Return Values

The number of rows that have been appended.

Examples

(1) Create a DFS table with the following script:

dbPath = "dfs://SAMPLE_TRDDB";
tableName = `demoTable
if(existsDatabase(dbPath)){
	dropDatabase(dbPath)
}
db = database(dbPath, VALUE, 2010.01.01..2010.01.30)
pt=db.createPartitionedTable(table(1000000:0, `name`date`price, [STRING,DATE,DOUBLE]), tableName, `date)

(2) Use PTA to append a table to the DFS table “demoTable“ in parallel.

TableSP createDemoTable(){
    vector<string> colNames = {"name", "date","price"};
    vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
    int colNum = 3, rowNum = 10, indexCapacity = 10;
    ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
    vector<VectorSP> columnVecs;
    for(int i = 0; i < colNum; ++i)
        columnVecs.push_back(table->getColumn(i));

    for(int i = 0; i < rowNum; ++i){
        columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i)));
        columnVecs[1]->set(i, Util::createDate(2010, 1, i+1));
        columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0));
    }
    return table;
}

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    DBConnectionPool pool("127.0.0.1", 8848, 20, "admin", "123456");
    PartitionedTableAppender appender("dfs://SAMPLE_TRDDB", "demoTable", "date", pool);
    TableSP table = createDemoTable();
    appender.append(table);
    ConstantSP result = conn.run("select * from loadTable('dfs://SAMPLE_TRDDB', `demoTable)");
    std::cout <<  result->getString() << std::cout;
}