MultithreadedTableWriter
The MultithreadedTableWriter
class (MTW) in DolphinDB allows for
asynchronous writing of data using multiple threads. It operates on a per-row basis,
employing a buffer queue. When specific conditions are met, the class submits a batch of
data to the DolphinDB server for writing.
Constructing an MTW
The constructor declaration is as follows:
MultithreadedTableWriter(const std::string& host,int port,
const std::string& userId, const std::string& password,
const string& dbPath, const string& tableName,
bool useSSL,
bool enableHighAvailability = false,
const vector<string> *pHighAvailabilitySites = nullptr,
int batchSize = 1,
float throttle = 0.01f,
int threadCount = 1,
const string& partitionCol ="",
const vector<COMPRESS_METHOD> *pCompressMethods = nullptr,
Mode mode = M_Append,
vector<string> *pModeOption = nullptr,
const std::function<void(ConstantSP)> &callbackFunc = nullptr,
bool enableStreamTableTimestamp = false);
Arguments
- host: The IP address of the server to connect to.
- port: The port number of the server to connect to.
- userId: The username for server login.
- password: The password for server login.
- dbPath: The DFS database path. Leave it unspecified for an in-memory table.
- tableName: The in-memory or DFS table name.
- useSSL: Whether to enable SSL.
- enableHighAvailability, pHighAvailabilitySites: Whether to enable high availability.
- batchSize: The number of records the MTW object will buffer before transmitting them to the server. 1 (default) means the MTW will send each record to the server immediately after receiving it from client. If batchSize is greater than 1, the MTW will buffer incoming records until it has received batchSize records before sending them to the server in a batch.
- throttle: Sets a time limit (in seconds) for how long the MTW will buffer incoming records before sending them to the server, even if the batchSize has not been reached. It must be greater than 0.
- threadCount: The number of threads to create. The default value is 1, indicating single-threaded process. It must be 1 for tables that do not support concurrent writing.
- partitionCol: Only takes effect when threadCount is greater than 1. For a partitioned table, it must be a partitioning column; for a stream table, it must be a column name; for a dimension table, it does not take effect.
- pCompressMethods: Compression methods to apply on a per-column basis. If
left unspecified, no compression is used. Each element in the list corresponds
to one column of data, in order. Valid compression types are:
-
"COMPRESS_LZ4": LZ4 algorithm;
-
"COMPRESS_DELTA": Delta-of-delta encoding.
-
- mode: The write mode. It can be: "M_Upsert" (call server function upsert!) or "M_Append" (call server function tableInsert).
- pModeOption: The optional parameters of
upsert!
. Only takes effect when mode = "M_Upsert". - callbackFunc: The callback function that is invoked after successfully inserting a row. Its parameter is a table where the first column is of string type, representing the ID of a row from the inserted data, and the second column is of BOOL type, indicating whether the insertion of that row was successful.
- enableStreamTableTimestamp: Whether to insert data to a stream table with
timestamp attached by
setStreamTableTimestamp
function. Note that the timestamp column can only be the last column of the stream table to be inserted.
Functions of MTW
insert
You can insert a single record with the insert
function.
The function declaration is as follows:
template<typename... TArgs>
bool insert(ErrorCodeInfo &errorInfo, TArgs... args)
Arguments
- errorInfo: Out-parameter. An
ErrorCodeInfo
object, including error code and info, for the failed insert operation. - args: Variadic arguments. The record to be inserted. Note that if callbackFunc is specified when constructing the MTW, the first argument of args must be of string type that indicates the ID of the inserted row.
Return Values
A boolean value indicating whether data is inserted successfully.
getUnwrittenData
You can use the getUnwrittenData
to obtain a list of data that
has not been written to the server.
Function declaration is as follows:
void getUnwrittenData(std::vector<std::vector<ConstantSP>*> &unwrittenData);
Arguments
- unwrittenData: Out-parameter. A nested list of records that have not yet been written to the DolphinDB server. This includes records that failed to be sent and records yet to be sent.
Note: Calling getUnwrittenData()
will remove those
records from the MTW, releasing them from memory.
insertUnwrittenData
You can insert multiple unwritten records into the table with the
insertUnwrittenData
function.
The function declaration is as follows:
bool insertUnwrittenData(std::vector<std::vector<ConstantSP>*> &records, ErrorCodeInfo &errorInfo)
Arguments
- records: The records that have not yet been written to the DolphinDB
server. Often obtained by calling
getUnwrittenData()
. - errorInfo: Out-parameter. An
ErrorCodeInfo
object, including error code and info, for the failed insert operation.
getStatus
You can call the getStatus
function to check the execution
status of the MTW object.
The function declaration is as follows:
void getStatus(Status &status);
Arguments
- status: Out-parameter. A
MultithreadedTableWriter::Status
object.
Return Values
Returns a MultithreadedTableWriter::Status
object, providing the
following details:
- isExiting: Whether the writer thread(s) are exiting.
- errorCode: The error code. Empty string if no error.
- errorInfo: The error message. None if no error.
- sentRows: The number of records successfully sent (written) to server.
- unsentRows: The number of records to be sent, including the records pending conversion and the converted records pending insertion into the sending queue.
- sendFailedRows: The number of records failed to be sent (including records in the sending queue)
- threadStatus: a list of status for each writer thread, including:
- threadId: The thread ID.
- sentRows: The number of records this thread has sent.
- unsentRows: The number of records pending to be sent by this thread.
- sendFailedRows: The number of records this thread has failed to send.
waitForThreadCompletion
Use the waitForThreadCompletion
function to make the MTW wait
until all working threads complete their tasks.
The function declaration is as follows:
void waitForThreadCompletion();
Arguments
None
Examples
Example 1: Standard Workflow
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
// create a table
string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
conn.run(script);
// construct an MTW object
ErrorCodeInfo errorInfo;
MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false);
char msg[] = "123456msg";
if(!writer.insert(errorInfo, 1, 2.3, msg)){
std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
return -1;
}
writer.waitForThreadCompletion();
// get the status of MTW
MultithreadedTableWriter::Status status;
writer.getStatus(status);
if (status.hasError()) {
std::cout << "error in writing: " << status.errorInfo << std::endl;
}
// check the results
std::cout << conn.run("t1")->getString() << std::endl;
}
Example 2: With callbackFunc Specified
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
// create a table
string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
conn.run(script);
auto cb = [](ConstantSP callbackTable)
{
/***
callbackTable schema:
column 0: id->string
column 1: success->bool
***/
std::cout << callbackTable->getString() << std::endl;
};
// construct an MTW object
ErrorCodeInfo errorInfo;
MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false, false, nullptr, 1, 0.1, 5, "col1", nullptr, MultithreadedTableWriter::M_Append, nullptr, cb);
char msg[] = "123456msg";
if(!writer.insert(errorInfo, "row1", 1, 2.3, msg)){
std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
return -1;
}
writer.waitForThreadCompletion();
// get the status of MTW
MultithreadedTableWriter::Status status;
writer.getStatus(status);
if (status.hasError()) {
std::cout << "error in writing: " << status.errorInfo << std::endl;
}
// check the results
std::cout << conn.run("t1")->getString() << std::endl;
}
Example 3: Writing to a DFS Table
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
conn.run(R"(dbName = 'dfs://valuedb3'
if(exists(dbName)){
dropDatabase(dbName);
}
datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,LONG]);
db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]);
pt=db.createPartitionedTable(datetest,'pdatetest','id');)");
vector<COMPRESS_METHOD> compress;
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_DELTA);
MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, NULL, 10000, 1, 5, "id", &compress);
ErrorCodeInfo errorInfo;
// insert 100 rows of records with correct data types and column count
for (int i = 0; i < 100; i++) {
if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB", rand() % 10000) == false) {
// this line will not be executed
cout << "insert failed: " << errorInfo.errorInfo << endl;
break;
}
}
// An error message is returned if the inserted row has wrong data type
if (writer.insert(errorInfo, rand() % 10000, 222, rand() % 10000) == false) {
cout << "insert failed: " << errorInfo.errorInfo << endl;// insert failed: Column counts don't match 2
}
// An error message is returned if the inserted row has wrong columns
if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB") == false) {
cout << "insert failed: " << errorInfo.errorInfo << endl;
}
writer.waitForThreadCompletion();
cout << conn.run("select count(*) from pt")->getString() << endl;
}