MultithreadedTableWriter

The MultithreadedTableWriter class (MTW) in DolphinDB allows for asynchronous writing of data using multiple threads. It operates on a per-row basis, employing a buffer queue. When specific conditions are met, the class submits a batch of data to the DolphinDB server for writing.

Constructing an MTW

The constructor declaration is as follows:

MultithreadedTableWriter(const std::string& host,int port,
                         const std::string& userId, const std::string& password,
                         const string& dbPath, const string& tableName,
                         bool useSSL,
                         bool enableHighAvailability = false,
                         const vector<string> *pHighAvailabilitySites = nullptr,
                         int batchSize = 1,
                         float throttle = 0.01f,
                         int threadCount = 1,
                         const string& partitionCol ="",
                         const vector<COMPRESS_METHOD> *pCompressMethods = nullptr,
                         Mode mode = M_Append,
                         vector<string> *pModeOption = nullptr,
                         const std::function<void(ConstantSP)> &callbackFunc = nullptr,
                         bool enableStreamTableTimestamp = false);

Arguments

  • host: The IP address of the server to connect to.
  • port: The port number of the server to connect to.
  • userId: The username for server login.
  • password: The password for server login.
  • dbPath: The DFS database path. Leave it unspecified for an in-memory table.
  • tableName: The in-memory or DFS table name.
  • useSSL: Whether to enable SSL.
  • enableHighAvailability, pHighAvailabilitySites: Whether to enable high availability.
  • batchSize: The number of records the MTW object will buffer before transmitting them to the server. 1 (default) means the MTW will send each record to the server immediately after receiving it from client. If batchSize is greater than 1, the MTW will buffer incoming records until it has received batchSize records before sending them to the server in a batch.
  • throttle: Sets a time limit (in seconds) for how long the MTW will buffer incoming records before sending them to the server, even if the batchSize has not been reached. It must be greater than 0.
  • threadCount: The number of threads to create. The default value is 1, indicating single-threaded process. It must be 1 for tables that do not support concurrent writing.
  • partitionCol: Only takes effect when threadCount is greater than 1. For a partitioned table, it must be a partitioning column; for a stream table, it must be a column name; for a dimension table, it does not take effect.
  • pCompressMethods: Compression methods to apply on a per-column basis. If left unspecified, no compression is used. Each element in the list corresponds to one column of data, in order. Valid compression types are:
    • "COMPRESS_LZ4": LZ4 algorithm;

    • "COMPRESS_DELTA": Delta-of-delta encoding.

  • mode: The write mode. It can be: "M_Upsert" (call server function upsert!) or "M_Append" (call server function tableInsert).
  • pModeOption: The optional parameters of upsert!. Only takes effect when mode = "M_Upsert".
  • callbackFunc: The callback function that is invoked after successfully inserting a row. Its parameter is a table where the first column is of string type, representing the ID of a row from the inserted data, and the second column is of BOOL type, indicating whether the insertion of that row was successful.
  • enableStreamTableTimestamp: Whether to insert data to a stream table with timestamp attached by setStreamTableTimestamp function. Note that the timestamp column can only be the last column of the stream table to be inserted.

Functions of MTW

insert

You can insert a single record with the insert function.

The function declaration is as follows:

template<typename... TArgs>
bool insert(ErrorCodeInfo &errorInfo, TArgs... args)

Arguments

  • errorInfo: Out-parameter. An ErrorCodeInfo object, including error code and info, for the failed insert operation.
  • args: Variadic arguments. The record to be inserted.
    Note:
    • If callbackFunc is specified when constructing the MTW, the first argument of args must be of string type that indicates the ID of the inserted row.
    • The insert function will release the pointer contained in args after execution, so it is recommended to pass ConstantSP or C++ basic type arguments.

Return Values

A boolean value indicating whether data is inserted successfully.

getUnwrittenData

You can use the getUnwrittenData to obtain a list of data that has not been written to the server.

Function declaration is as follows:

void getUnwrittenData(std::vector<std::vector<ConstantSP>*> &unwrittenData);

Arguments

  • unwrittenData: Out-parameter. A nested list of records that have not yet been written to the DolphinDB server. This includes records that failed to be sent and records yet to be sent.

Note: Calling getUnwrittenData() will remove those records from the MTW, releasing them from memory.

insertUnwrittenData

You can insert multiple unwritten records into the table with the insertUnwrittenData function.

The function declaration is as follows:

bool insertUnwrittenData(std::vector<std::vector<ConstantSP>*> &records, ErrorCodeInfo &errorInfo)

Arguments

  • records: The records that have not yet been written to the DolphinDB server. Often obtained by calling getUnwrittenData().
  • errorInfo: Out-parameter. An ErrorCodeInfo object, including error code and info, for the failed insert operation.

getStatus

You can call the getStatus function to check the execution status of the MTW object.

The function declaration is as follows:

void getStatus(Status &status);

Arguments

  • status: Out-parameter. A MultithreadedTableWriter::Status object.

Return Values

Returns a MultithreadedTableWriter::Status object, providing the following details:

  • isExiting: Whether the writer thread(s) are exiting.
  • errorCode: The error code. Empty string if no error.
  • errorInfo: The error message. None if no error.
  • sentRows: The number of records successfully sent (written) to server.
  • unsentRows: The number of records to be sent, including the records pending conversion and the converted records pending insertion into the sending queue.
  • sendFailedRows: The number of records failed to be sent (including records in the sending queue)
  • threadStatus: a list of status for each writer thread, including:
    • threadId: The thread ID.
    • sentRows: The number of records this thread has sent.
    • unsentRows: The number of records pending to be sent by this thread.
    • sendFailedRows: The number of records this thread has failed to send.

waitForThreadCompletion

Use the waitForThreadCompletion function to make the MTW wait until all working threads complete their tasks.

The function declaration is as follows:

void waitForThreadCompletion();

Arguments

None

Examples

Example 1: Standard Workflow

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    // create a table
    string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
    conn.run(script);
    
    // construct an MTW object
    ErrorCodeInfo errorInfo;
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false);
    char msg[] = "123456msg";
    if(!writer.insert(errorInfo, 1, 2.3, msg)){
        std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
        return -1;
    }
    writer.waitForThreadCompletion();
    // get the status of MTW
    MultithreadedTableWriter::Status status;
    writer.getStatus(status);
    if (status.hasError()) {
        std::cout << "error in writing: " << status.errorInfo << std::endl;
    }
    // check the results
    std::cout << conn.run("t1")->getString() << std::endl;
}

Example 2: With callbackFunc Specified

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    // create a table
    string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
    conn.run(script);

    auto cb = [](ConstantSP callbackTable)
	{
		/***
			callbackTable schema:
			column 0: id->string
			column 1: success->bool
		***/
		std::cout << callbackTable->getString() << std::endl;
	};

    // construct an MTW object
    ErrorCodeInfo errorInfo;
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false, false, nullptr, 1, 0.1, 5, "col1", nullptr, MultithreadedTableWriter::M_Append, nullptr, cb);
    char msg[] = "123456msg";
    if(!writer.insert(errorInfo, "row1", 1, 2.3, msg)){
        std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
        return -1;
    }
    writer.waitForThreadCompletion();
    // get the status of MTW
    MultithreadedTableWriter::Status status;
    writer.getStatus(status);
    if (status.hasError()) {
        std::cout << "error in writing: " << status.errorInfo << std::endl;
    }
    // check the results
    std::cout << conn.run("t1")->getString() << std::endl;
}

Example 3: Writing to a DFS Table

int main(int argc, const char **argv)
{
    DBConnection conn;
	conn.connect("127.0.0.1", 8848, "admin", "123456");
	conn.run(R"(dbName = 'dfs://valuedb3'
                if(exists(dbName)){
                    dropDatabase(dbName);
                }
                datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,LONG]);
                db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]);
                pt=db.createPartitionedTable(datetest,'pdatetest','id');)");

    vector<COMPRESS_METHOD> compress;
    compress.push_back(COMPRESS_LZ4);
    compress.push_back(COMPRESS_LZ4);
    compress.push_back(COMPRESS_DELTA);
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, NULL, 10000, 1, 5, "id", &compress);
    
    ErrorCodeInfo errorInfo;
    // insert 100 rows of records with correct data types and column count
    for (int i = 0; i < 100; i++) {
        if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB", rand() % 10000) == false) {
            // this line will not be executed
            cout << "insert failed: " << errorInfo.errorInfo << endl;
            break;
        }
    }
    // An error message is returned if the inserted row has wrong data type
    if (writer.insert(errorInfo, rand() % 10000, 222, rand() % 10000) == false) {
        cout << "insert failed: " << errorInfo.errorInfo << endl;// insert failed: Column counts don't match 2
    }
    // An error message is returned if the inserted row has wrong columns
    if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB") == false) {
        cout << "insert failed: " << errorInfo.errorInfo << endl;
    }
    writer.waitForThreadCompletion();
    cout << conn.run("select count(*) from pt")->getString() << endl;
}