MultithreadedTableWriter

The MultithreadedTableWriter class (MTW) in DolphinDB allows for asynchronous writing of data using multiple threads. It operates on a per-row basis, employing a buffer queue. When specific conditions are met, the class submits a batch of data to the DolphinDB server for writing.

Constructing an MTW

The constructor declaration is as follows:

MultithreadedTableWriter(const std::string& host,int port,
                         const std::string& userId, const std::string& password,
                         const string& dbPath, const string& tableName,
                         bool useSSL,
                         bool enableHighAvailability = false,
                         const vector<string> *pHighAvailabilitySites = nullptr,
                         int batchSize = 1,
                         float throttle = 0.01f,
                         int threadCount = 1,
                         const string& partitionCol ="",
                         const vector<COMPRESS_METHOD> *pCompressMethods = nullptr,
                         Mode mode = M_Append,
                         vector<string> *pModeOption = nullptr,
                         const std::function<void(ConstantSP)> &callbackFunc = nullptr,
                         bool enableStreamTableTimestamp = false);

Arguments

  • host: The IP address of the server to connect to.
  • port: The port number of the server to connect to.
  • userId: The username for server login.
  • password: The password for server login.
  • dbPath: The DFS database path. Leave it unspecified for an in-memory table.
  • tableName: The in-memory or DFS table name.
  • useSSL: Whether to enable SSL.
  • enableHighAvailability, pHighAvailabilitySites: Whether to enable high availability.
  • batchSize: The number of records the MTW object will buffer before transmitting them to the server. 1 (default) means the MTW will send each record to the server immediately after receiving it from client. If batchSize is greater than 1, the MTW will buffer incoming records until it has received batchSize records before sending them to the server in a batch.
  • throttle: Sets a time limit (in seconds) for how long the MTW will buffer incoming records before sending them to the server, even if the batchSize has not been reached. It must be greater than 0.
  • threadCount: The number of threads to create. The default value is 1, indicating single-threaded process. It must be 1 for tables that do not support concurrent writing.
  • partitionCol: Only takes effect when threadCount is greater than 1. For a partitioned table, it must be a partitioning column; for a stream table, it must be a column name; for a dimension table, it does not take effect.
  • pCompressMethods: Compression methods to apply on a per-column basis. If left unspecified, no compression is used. Each element in the list corresponds to one column of data, in order. Valid compression types are:
    • "COMPRESS_LZ4": LZ4 algorithm;

    • "COMPRESS_DELTA": Delta-of-delta encoding.

  • mode: The write mode. It can be: "M_Upsert" (call server function upsert!) or "M_Append" (call server function tableInsert).
  • pModeOption: The optional parameters of upsert!. Only takes effect when mode = "M_Upsert".
  • callbackFunc: The callback function that is invoked after successfully inserting a row. Its parameter is a table where the first column is of string type, representing the ID of a row from the inserted data, and the second column is of BOOL type, indicating whether the insertion of that row was successful.
  • enableStreamTableTimestamp: Whether to insert data to a stream table with timestamp attached by setStreamTableTimestamp function. Note that the timestamp column can only be the last column of the stream table to be inserted.

New constructor declaration with MTWConfig:

MultithreadedTableWriter(const MTWConfig &config);

The definition of the MTWConfig class:

enum class WriteMode {
    Append,
    Upsert,
};

enum class MTWState {
    Initializing,      // MTW is initializing. For internal use only. No callback.
    ConnectedOne,      // One connection has been established.
    ConnectedAll,      // All connections have been established.
    ReconnectingOne,   // One connection is reconnecting.
    ReconnectingAll,   // ALL connections are reconnecting.
    Terminated         // MTW has terminated. For internal use only. No callback.
};

class MTWConfig {
public:
  // Corresponds to the original parameters: host, port, userId, password, tableName, useSSL, enableHighAvailability, pHighAvailabilitySites
  MTWConfig(const std::shared_ptr<DBConnection> conn, const std::string &tableName);
  // Corresponds to the original parameters: batchSize, throttle
  template<typename Rep, typename Period>
  MTWConfig& setBatching(const size_t batchSize, const std::chrono::duration<Rep,Period> throttle);
  // Corresponds to the original parameter: pCompressMethods
  MTWConfig& setCompression(const std::vector<COMPRESS_METHOD> &compressMethods);
  // Corresponds to the original parameters: threadCount、partitionCol
  MTWConfig& setThreads(const size_t threadNum, const std::string& partitionColumnName);
  // Corresponds to the original parameters: mode, pModeOption
  MTWConfig& setWriteMode(const WriteMode mode, const std::vector<std::string> &option = std::vector<std::string>());
  // Corresponds to the original parameter: enableStreamTableTimestamp
  MTWConfig& setStreamTableTimestamp();
  // New interface for connection state callback
  using stateCallbackT = std::function<bool(const MTWState state, const std::string &host, const int port)>;
  MTWConfig& onConnectionStateChange(const stateCallbackT &callback);
  // Corresponds to the original parameters: callbackFunc (data write callback)
  using dataCallbackT = std::function<void(ConstantSP)>;
  MTWConfig& onDataWrite(const dataCallbackT &callback);
};
Note: The return value (bool) is not effective now. Please return true. Future releases will support returning false to interrupt the writing.

Functions of MTW

insert

You can insert a single record with the insert function.

The function declaration is as follows:

template<typename... TArgs>
bool insert(ErrorCodeInfo &errorInfo, TArgs... args)

Arguments

  • errorInfo: Out-parameter. An ErrorCodeInfo object, including error code and info, for the failed insert operation.
  • args: Variadic arguments. The record to be inserted.
    Note:
    • If callbackFunc is specified when constructing the MTW, the first argument of args must be of string type that indicates the ID of the inserted row.
    • The insert function will release the pointer contained in args after execution, so it is recommended to pass ConstantSP or C++ basic type arguments.

Return Values

A boolean value indicating whether data is inserted successfully.

getUnwrittenData

You can use the getUnwrittenData to obtain a list of data that has not been written to the server.

Function declaration is as follows:

void getUnwrittenData(std::vector<std::vector<ConstantSP>*> &unwrittenData);

Arguments

  • unwrittenData: Out-parameter. A nested list of records that have not yet been written to the DolphinDB server. This includes records that failed to be sent and records yet to be sent.

Note: Calling getUnwrittenData() will remove those records from the MTW, releasing them from memory.

insertUnwrittenData

You can insert multiple unwritten records into the table with the insertUnwrittenData function.

The function declaration is as follows:

bool insertUnwrittenData(std::vector<std::vector<ConstantSP>*> &records, ErrorCodeInfo &errorInfo)

Arguments

  • records: The records that have not yet been written to the DolphinDB server. Often obtained by calling getUnwrittenData().
  • errorInfo: Out-parameter. An ErrorCodeInfo object, including error code and info, for the failed insert operation.

getStatus

You can call the getStatus function to check the execution status of the MTW object.

The function declaration is as follows:

void getStatus(Status &status);

Arguments

  • status: Out-parameter. A MultithreadedTableWriter::Status object.

Return Values

Returns a MultithreadedTableWriter::Status object, providing the following details:

  • isExiting: Whether the writer thread(s) are exiting.
  • errorCode: The error code. Empty string if no error.
  • errorInfo: The error message. None if no error.
  • sentRows: The number of records successfully sent (written) to server.
  • unsentRows: The number of records to be sent, including the records pending conversion and the converted records pending insertion into the sending queue.
  • sendFailedRows: The number of records failed to be sent (including records in the sending queue)
  • threadStatus: a list of status for each writer thread, including:
    • threadId: The thread ID.
    • sentRows: The number of records this thread has sent.
    • unsentRows: The number of records pending to be sent by this thread.
    • sendFailedRows: The number of records this thread has failed to send.

waitForThreadCompletion

Use the waitForThreadCompletion function to make the MTW wait until all working threads complete their tasks.

The function declaration is as follows:

void waitForThreadCompletion();

Arguments

None

Examples

Example 1: Standard Workflow

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    // create a table
    string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
    conn.run(script);
    
    // construct an MTW object
    ErrorCodeInfo errorInfo;
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false);
    char msg[] = "123456msg";
    if(!writer.insert(errorInfo, 1, 2.3, msg)){
        std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
        return -1;
    }
    writer.waitForThreadCompletion();
    // get the status of MTW
    MultithreadedTableWriter::Status status;
    writer.getStatus(status);
    if (status.hasError()) {
        std::cout << "error in writing: " << status.errorInfo << std::endl;
    }
    // check the results
    std::cout << conn.run("t1")->getString() << std::endl;
}

Example 2: With callbackFunc Specified

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    // create a table
    string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
    conn.run(script);

    auto cb = [](ConstantSP callbackTable)
	{
		/***
			callbackTable schema:
			column 0: id->string
			column 1: success->bool
		***/
		std::cout << callbackTable->getString() << std::endl;
	};

    // construct an MTW object
    ErrorCodeInfo errorInfo;
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false, false, nullptr, 1, 0.1, 5, "col1", nullptr, MultithreadedTableWriter::M_Append, nullptr, cb);
    char msg[] = "123456msg";
    if(!writer.insert(errorInfo, "row1", 1, 2.3, msg)){
        std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
        return -1;
    }
    writer.waitForThreadCompletion();
    // get the status of MTW
    MultithreadedTableWriter::Status status;
    writer.getStatus(status);
    if (status.hasError()) {
        std::cout << "error in writing: " << status.errorInfo << std::endl;
    }
    // check the results
    std::cout << conn.run("t1")->getString() << std::endl;
}

Example 3: Writing to a DFS Table

int main(int argc, const char **argv)
{
    DBConnection conn;
	conn.connect("127.0.0.1", 8848, "admin", "123456");
	conn.run(R"(dbName = 'dfs://valuedb3'
                if(exists(dbName)){
                    dropDatabase(dbName);
                }
                datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,LONG]);
                db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]);
                pt=db.createPartitionedTable(datetest,'pdatetest','id');)");

    vector<COMPRESS_METHOD> compress;
    compress.push_back(COMPRESS_LZ4);
    compress.push_back(COMPRESS_LZ4);
    compress.push_back(COMPRESS_DELTA);
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, NULL, 10000, 1, 5, "id", &compress);
    
    ErrorCodeInfo errorInfo;
    // insert 100 rows of records with correct data types and column count
    for (int i = 0; i < 100; i++) {
        if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB", rand() % 10000) == false) {
            // this line will not be executed
            cout << "insert failed: " << errorInfo.errorInfo << endl;
            break;
        }
    }
    // An error message is returned if the inserted row has wrong data type
    if (writer.insert(errorInfo, rand() % 10000, 222, rand() % 10000) == false) {
        cout << "insert failed: " << errorInfo.errorInfo << endl;// insert failed: Column counts don't match 2
    }
    // An error message is returned if the inserted row has wrong columns
    if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB") == false) {
        cout << "insert failed: " << errorInfo.errorInfo << endl;
    }
    writer.waitForThreadCompletion();
    cout << conn.run("select count(*) from pt")->getString() << endl;
}

Example 4:

int main(){
    // Establishes a connection and verifies the server status based on your own situation.
    std::shared_ptr conn = std::make_shared<DBConnection>();
    conn->connect("localhost", 8848, "admin", "123456", "", false, {}, 7200, true);
    //  Handles the connection based on your own situation.
    auto cb = [&conn](const MTWState state, const std::string &host, const int port) {
        std::cout << "This is a callback: state " << static_cast<int>(state) << ", server " << host << ":" << port << std::endl;
        return true;
    };
    // Constructs and configures the MTW class.
    MTWConfig config(conn, "myTable");
    config.onConnectionStateChange(cb).setThreads(5, "myColumn");
    MultithreadedTableWriter mtw(config);

    // Subsequent operations are independent of the constructor interface. Supports all MTW functions.
    ErrorCodeInfo pErrorInfo;
    string sym[] = {"A", "B", "C", "D"};
    for (int i = 0; i < 10; i++) {
        mtw.insert(pErrorInfo, sym[i % 4], i * 12, i + 64);
        std::this_thread::sleep_for(100ms);
    }
    mtw.waitForThreadCompletion();
    return 0;
}