tableInsert / insert into
This section explains how to insert data using DolphinDB's tableInsert
function and the insert into
statement within the C++ API. This
synchronized write method supports both single and batch writes.
Writing to an In-Memory Table
Create a shared in-memory table with the following DolphinDB script.
t = table(100:0, `name` date`price, [STRING, DATE, DOUBLE]);
share t as tglobal;
Inserting Records with insert into
Insert a single record:
std::string assembleScript(const std::string& name, int date, double price){
return string("insert into tglobal values(`").append(name).append(",").append(std::to_string(date)).append(",").append(std::to_string(price).append(")"));
}
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
conn.run(assembleScript("Tom", 0, 1.2));
conn.run(assembleScript("Lily", 1, 2.2));
conn.run(assembleScript("Lucy", 2, 3.2));
std::cout << conn.run("tglobal")->getString() << std::endl;
}
Insert multiple records:
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
string script;
int rowNum=10000, indexCapacity=10000;
VectorSP names = Util::createVector(DT_STRING, rowNum, indexCapacity);
VectorSP dates = Util::createVector(DT_DATE, rowNum, indexCapacity);
VectorSP prices = Util::createVector(DT_DOUBLE, rowNum, indexCapacity);
int array_dt_buf[Util:: BUF_SIZE]; // buffer for the date column
double array_db_buf[Util:: BUF_SIZE]; // buffer for the price column
int start = 0;
int no=0;
while (start < rowNum) {
size_t len = std::min(Util::BUF_SIZE, rowNum - start);
int *dtp = dates->getIntBuffer(start, len, array_dt_buf); //dtp points to the start of the buffer returned by `getIntBuffer`
double *dbp = prices->getDoubleBuffer(start, len, array_db_buf); //dbp points to the start of the buffer returned by `getDoubleBuffer`
for (size_t i = 0; i < len; i++) {
names->setString(i+start, "name_"+std::to_string(++no)); // direct assignment to the name column of string type
dtp[i] = 17898+i;
dbp[i] = (rand()%100)/3.0;
}
dates->setInt(start, len, dtp); // use `setInt` to update the date column
prices->setDouble(start, len, dbp); //use `setDouble` to update the price column
start += len;
}
vector<string> allnames = {"names", "dates", "prices"};
vector<ConstantSP> allcols = {names, dates, prices};
conn.upload(allnames, allcols);
script += "insert into tglobal values(names, dates, prices); tglobal";
TableSP table = conn.run(script);
}
Inserting a Table with tableInsert
Writing to a DFS Table
Inserting With a Single Thread
(1) Creating a DolphinDB DFS table
dbPath = "dfs://SAMPLE_TRDDB";
tableName = `demoTable
db = database(dbPath, VALUE, 2010.01.01..2010.01.30)
pt=db.createPartitionedTable(table(1000000:0, `name`date `price, [STRING,DATE,DOUBLE]), tableName, `date)
(2) Writing with tableInsert
TableSP createDemoTable(){
vector<string> colNames = {"name", "date","price"};
vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
int colNum = 3, rowNum = 10, indexCapacity = 10;
ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
vector<VectorSP> columnVecs;
for(int i = 0; i < colNum; ++i)
columnVecs.push_back(table->getColumn(i));
for(int i = 0; i < rowNum; ++i){
columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i)));
columnVecs[1]->set(i, Util::createDate(2010, 1, i+1));
columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0));
}
return table;
}
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
TableSP table = createDemoTable();
vector<ConstantSP> args;
args.push_back(table);
conn.run("tableInsert{loadTable('dfs://SAMPLE_TRDDB', `demoTable)}", args);
}
Inserting With Multi-thread
(1) Creating a DolphinDB DFS table
Create a COMPO-partitioned (VALUE-HASH-HASH) database "dfs://natlog" and a DFS table "natlogrecords".
dbName="dfs://natlog"
tableName="natlogrecords"
db1 = database("", VALUE, datehour(2019.09.11T00:00:00)..datehour(2019.09.13T00:00:00) )//starttime, newValuePartitionPolicy=add
db2 = database("", HASH, [IPADDR, 5]) //source_address
db3 = database("", HASH, [IPADDR, 5]) //destination_address
db = database(dbName, COMPO, [db1,db2,db3])
data = table(1:0, ["fwname","filename","source_address","source_port","destination_address","destination_port","nat_source_address","nat_source_port","starttime","stoptime","elapsed_time"], [SYMBOL,STRING,IPADDR,INT,IPADDR,INT,IPADDR,INT,DATETIME,DATETIME,INT])
db.createPartitionedTable(data,tableName,`starttime`source_address`destination_address)
(2) Writing with tableInsert
DolphinDB does not permit multiple writers to write data to the same partition simultaneously. Therefore, when writing data in parallel on the client side, ensure that each thread writes to a different partition independently.
When working with DFS tables partitioned by hash value, the DolphinDB C++ API
offers the getHash
function to retrieve the hash value of the
data. In designing concurrent writing of DFS tables on the client side, you can
group the data based on the hash value of the hash-partitioned fields. Then,
assign a dedicated write thread to each group, ensuring that each thread writes
data to a different hash partition concurrently.
#include "Concurrent.h"
#include "DolphinDB.h"
#include "Util.h"
#include <iostream>
#include <sstream>
#include <string>
#include <sys/time.h>
#include <thread>
#include <arpa/inet.h>
using namespace dolphindb;
using namespace std;
#define BUCKETS 50
#define MAX_THREAD_NUM BUCKETS
DBConnection conn[MAX_THREAD_NUM];
SmartPointer<BoundedBlockingQueue<TableSP>> tableQueue[MAX_THREAD_NUM];
// create a structure for parameters
struct parameter {
int index;
int count;
long cLong;
long nLong;
long nTime;
long nStarttime;
};
void showUsage() {
cout << "DolpinDB Multi-threaded performance test program" << endl;
cout << "Usage example:--h=127.0.0.1 --p=8921 --c=1000 --n=5 --t=5 --s=1579080800000" << endl;
cout << "Options :" << endl;
cout << " --h=127.0.0.1 Mandatory,dolphindb host, Multiple hosts separated by commas" << endl;
cout << " --p=8921 Mandatory,dolphindb port, Multiple ports separated by commas.The number of ports should be the same of hosts!" << endl;
cout << " --c=1000 Mandatory,The number of records inserted per thread" << endl;
cout << " --n=5 Optional,Batches insertions per thread,default is 1" << endl;
cout << " --t=5 Optional,Threads number,default is 1,max is " << BUCKETS << endl;
cout << " --s=1574380800 Optional,start time,default is " << Util::getEpochTime() / 1000 << endl;
cout << " --help Print this help." << endl;
}
// create tables for each partition
TableSP createDemoTable(long rows, long startPartition, long partitionCount, long startTime, int timeInc) {
vector<string> colNames = {"fwname", "filename", "source_address", "source_port", "destination_address", "destination_port",
"nat_source_address", "nat_source_port", "starttime", "stoptime", "elapsed_time"};
vector<DATA_TYPE> colTypes = {DT_SYMBOL, DT_STRING, DT_IP, DT_INT, DT_IP, DT_INT, DT_IP, DT_INT, DT_DATETIME, DT_DATETIME, DT_INT};
int colNum = 11, rowNum = rows, indexCapacity = rows;
ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
vector<VectorSP> columnVecs;
for (int i = 0; i < colNum; i++)
columnVecs.push_back(table->getColumn(i));
unsigned char sip[16] = {0};
sip[3] = 192;
sip[2] = startPartition;
sip[1] = partitionCount;
ConstantSP spIP = Util::createConstant(DT_IP);
for (int j = 1; j < 255; j++) {
sip[0] = j;
spIP->setBinary(0, 16, sip);
// partitioned by hash values
if (spIP->getHash(BUCKETS) >= startPartition && spIP->getHash(BUCKETS) < startPartition + partitionCount) {
break;
}
}
unsigned char ip[16] = {0};
for (int i = 0; i < rowNum; i++) {
columnVecs[0]->setString(i, "10.189.45.2:9000");
columnVecs[1]->setString(i, std::to_string(startPartition));
columnVecs[2]->setBinary(i, 16, sip);
columnVecs[3]->setInt(i, 1 * i);
memcpy(ip, (unsigned char *)&i, 4);
columnVecs[4]->setBinary(i, 16, ip);
columnVecs[5]->setInt(i, 2 * i);
columnVecs[6]->set(i, Util::parseConstant(DT_IP, "192.168.1.1"));
columnVecs[7]->setInt(i, 3 * i);
columnVecs[8]->setLong(i, startTime + timeInc);
columnVecs[9]->setLong(i, i + startTime + 100);
columnVecs[10]->setInt(i, i);
}
return table;
}
// write with tableInsert
void *writeData(void *arg) {
struct parameter *pParam;
pParam = (struct parameter *)arg;
TableSP table;
for (unsigned int i = 0; i < pParam->nLong; i++) {
tableQueue[pParam->index]->pop(table);
long long startTime = Util::getEpochTime();
vector<ConstantSP> args;
args.push_back(table);
conn[pParam->index].run("tableInsert{loadTable('dfs://natlog', `natlogrecords)}", args);
pParam->nTime += Util::getEpochTime() - startTime;
}
printf("Thread %d,insert %ld rows %ld times, used %ld ms.\n", pParam->index, pParam->cLong, pParam->nLong, pParam->nTime);
return NULL;
}
// generate sample data
void *genData(void *arg) {
struct parameter *pParam;
pParam = (struct parameter *)arg;
long partitionCount = BUCKETS / pParam->count;
for (unsigned int i = 0; i < pParam->nLong; i++) {
TableSP table = createDemoTable(pParam->cLong, partitionCount * pParam->index, partitionCount, pParam->nStarttime, i * 5);
tableQueue[pParam->index]->push(table);
}
return NULL;
}
int main(int argc, char *argv[]) {
if (argc < 2) {
cout << "No arguments, you MUST give an argument at least!" << endl;
showUsage();
return -1;
}
// parse the parameter
int nOptionIndex = 1;
string cString, nString, hString, pString, tString, sString;
stringstream cSS, nSS, pSS, tSS, sSS;
long cLong, nLong, pLong, tLong, sLong;
vector<string> vHost, vPort;
while (nOptionIndex < argc) {
if (strncmp(argv[nOptionIndex], "--c=", 4) == 0) { // get records number per threads
cString = &argv[nOptionIndex][4];
} else if (strncmp(argv[nOptionIndex], "--h=", 4) == 0) { // get host
hString = &argv[nOptionIndex][4];
} else if (strncmp(argv[nOptionIndex], "--p=", 4) == 0) { // get port
pString = &argv[nOptionIndex][4];
} else if (strncmp(argv[nOptionIndex], "--n=", 4) == 0) { // get batches
nString = &argv[nOptionIndex][4];
} else if (strncmp(argv[nOptionIndex], "--t=", 4) == 0) { // get thread
tString = &argv[nOptionIndex][4];
} else if (strncmp(argv[nOptionIndex], "--s=", 4) == 0) { // get start time
sString = &argv[nOptionIndex][4];
} else if (strncmp(argv[nOptionIndex], "--help", 6) == 0) { // help
showUsage();
return 0;
} else {
cout << "Options '" << argv[nOptionIndex] << "' not valid. Run '" << argv[0] << "' for details." << endl;
return -1;
}
nOptionIndex++;
}
if (cString.empty()) {
cout << "--c is required" << endl;
showUsage();
return -1;
} else {
cSS << cString;
cSS >> cLong;
}
if (pString.empty()) {
cout << "--p is required" << endl;
showUsage();
return -1;
} else {
vPort = Util::split(pString, ',');
}
if (hString.empty()) {
cout << "--h is required" << endl;
showUsage();
return -1;
} else {
vHost = Util::split(hString, ',');
}
if (nString.empty()) {
nLong = 1;
} else {
nSS << nString;
nSS >> nLong;
}
if (tString.empty()) {
tLong = 1;
} else {
tSS << tString;
tSS >> tLong;
}
if (sString.empty()) {
sLong = Util::getEpochTime() / 1000; // 1574380800;
cout << "starttime=" << sLong << endl;
} else {
sSS << sString;
sSS >> sLong;
}
if (tLong > BUCKETS) {
cout << "The number of threads must be less than " << BUCKETS << endl;
showUsage();
return -1;
}
if (vHost.size() != vPort.size()) {
cout << "The number of host and port must be the same! " << vHost.size() << ":" << vPort.size() << endl;
showUsage();
return -1;
}
try {
for (int i = 0; i < tLong; ++i) {
hString = vHost[i % vHost.size()];
pLong = std::stol(vPort[i % vPort.size()]);
bool ret = conn[i].connect(hString, pLong, "admin", "123456");
if (!ret) {
cout << "Failed to connect to the server" << endl;
return 0;
}
tableQueue[i] = new BoundedBlockingQueue<TableSP>(2);
}
} catch (exception &ex) {
cout << "Failed to connect with error: " << ex.what();
return -1;
}
cout << "Please waiting..." << endl;
// start threads for generating and writing data
long long startTime = Util::getEpochTime();
struct parameter arg[tLong];
std::thread genThreads[tLong];
std::thread writeThreads[tLong];
for (int i = 0; i < tLong; ++i) {
arg[i].index = i;
arg[i].count = tLong;
arg[i].nLong = nLong;
arg[i].cLong = cLong;
arg[i].nTime = 0;
arg[i].nStarttime = sLong;
genThreads[i] = std::thread(genData, (void *)&arg[i]);
writeThreads[i] = std::thread(writeData, (void *)&arg[i]);
}
for (int i = 0; i < tLong; ++i) {
genThreads[i].join();
writeThreads[i].join();
}
// check the elapsed time
long long endTime = Util::getEpochTime();
long long rowCount = cLong * nLong * tLong;
cout << "Inserted " << rowCount << " rows, took a total of " + std::to_string(endTime - startTime) + " ms. "
<< rowCount / (endTime - startTime) * 1000 / 10000 << " w/s " << endl;
long timeSum = arg[0].nTime;
for (int i = 1; i < tLong; ++i) {
timeSum += arg[i].nTime;
}
cout << "Total time minus data preparation time: " << std::to_string(timeSum / (double)tLong) + " ms" << endl;
return 0;
}