MultithreadedTableWriter

The Java API provides the MultithreadedTableWriter class for asynchronous data writes, maintaining a data buffer queue on the client. When the server is busy with network I/O, the client write threads can still write data continuously into the buffer queue maintained by the client. Once the queue receives the task, the client considers the task completed. Writing to the queue is immediate, thus avoiding busy waits for the write threads. Currently, MultithreadedTableWriter supports batch writing to in-memory tables and DFS tables.

Constructor

public MultithreadedTableWriter(String hostName, int port, String userId, String password,
    String dbName, String tableName, boolean useSSL,
    boolean enableHighAvailability, String[] highAvailabilitySites,
    int batchSize, float throttle,
    int threadCount, String partitionCol,
    int[] compressTypes, Mode mode, String[] pModeOption,
    boolean enableActualSendTime,
    boolean reconnect,
    int tryReconnectNums)

Parameters

  • hostName: Host name of the server to connect to.
  • port: Server port number.
  • userId / password: Username and password.
  • dbPath: A String indicating the DFS database path. Leave it unspecified for an in-memory table.
  • tableName: A String indicating the in-memory or DFS table name.
    Note: For Java API 1.30.17 or lower versions, when writing to an in-memory table, please specify the in-memory table name for dbPath and leave tableName empty.
  • useSSL: Whether to enable encrypted communication.
  • enableHighAvailability: Whether to enable API high availability.
  • highAvailabilitySites: A String array of ip:port of all available nodes.
  • batchSize: An int indicating the number of messages in batch processing. The default value is 1, meaning the server processes the data as soon as they are written. If it is greater than 1, only when the number of data reaches batchSize, the client will send the data to the server.
  • throttle: A positive floating-point number indicating the waiting time (in seconds) before the server processes the incoming data if the number of data written from the client does not reach batchSize.
  • threadCount: An int indicating the number of working threads to be created. The default value is 1, indicating single-threaded process. It must be 1 for a dimension table.
  • partitionCol: A String indicating the partitioning column. It is empty by default, and only takes effect when threadCount is greater than 1.
    • For a partitioned table, it must be the partitioning column.
    • For a stream table, it must be a column name.
    • For a dimension table, the parameter does not take effect.
  • compressTypes: An array of the compression methods used for each column. If unspecified, the columns are not compressed. The compression methods (case-insensitive) include:
    • "Vector.COMPRESS_LZ4": LZ4 algorithm
    • "Vector.COMPRESS_DELTA": Delta-of-delta encoding
  • mode: The write mode. It can be "M_Upsert" (call server function upsert!) or "M_Append" (call server function tableInsert).
  • pModeOption: A String array specifying optional parameters of upsert!. Only takes effect when mode = "M_Upsert".
  • callbackHandler: The callback function that is invoked after successfully inserting a row. The default value is empty. Its parameter is a table where the first column is of String type, representing the ID of a row from the inserted data, and the second column is of boolean type, indicating whether the insertion of that row was successful.
  • enableActualSendTime is a Boolean value that specifies whether to record the send time for each message. Note that the last column of tableName must be of NANOTIMESTAMP type.
  • reconnect (optional, default false): A Boolean value indicating whether to resubscribe after network disconnection.
  • tryReconnectNums: Number of reconnection attempts for each connection. If not specified, reconnection attempts are not limited. If specified, limited attempts are applied in HA and non-HA modes. In HA mode, cycles through each node once per attempt, up to tryReconnectNums cycles.

Methods

Method Description
insert Inserts a single row of data
getUnwrittenData Gets data that has not been written to the server
insertUnwrittenData Inserts data into the data table
getStatus Gets the current running status of the MTW object
waitForThreadCompletion MTW will enter a waiting state

insert

Syntax

ErrorCodeInfo insert(Object... args)

Parameters

  • args: A variable-length argument (varargs) indicating the record to be inserted.

Details

Insert a single record. Return a class ErrorCodeInfo containing errorCode and errorInfo. If errorCode is not "", MultithreadedTableWriter has failed to insert the data, and errorInfo displays the error message.

The class ErrorCodeInfo provides methods hasError() and succeed() to check whether the data is written properly. hasError() returns true if an error occurred, false otherwise. succeed() returns true if the data is written successfully, false otherwise.

Examples

MultithreadedTableWriter multithreadedTableWriter = new MultithreadedTableWriter("localhost", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest",
                false, false, null, 10000, 1,
                5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
ErrorCodeInfo pErrorInfo = multithreadedTableWriter_.insert(new Date(2022, 3, 23), "AAAAAAAB", 10000000L);

getUnwrittenData

List<List<Entity>> getUnwrittenData()

Details

Return a nested list representing the data that has not been written to the server. Note that MultithreadedTableWriter will release these data resources after retrieving them.

Examples

List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData();

insertUnwrittenData

ErrorCodeInfo insertUnwrittenData(List<List<Entity>> records)

Parameters

  • records: Data that needs to be written again, obtained via the getUnwrittenData method.

Details

Insert unwritten data. The result is in the same format as insert. The difference is that insertUnwrittenData can insert multiple records at a time.

Examples

List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData();
ErrorCodeInfo ret = multithreadedTableWriter_.insertUnwrittenData(unwrittenData);

getStatus

Status getStatus()

Get the current running status of the MultithreadedTableWriter object.

Return Value: Status is a class inherited from ErrorCodeInfo.

  • Status attributes:

    • isExiting: Whether the writing thread is exiting.
    • errorCode: Error code.
    • errorInfo: Error information.
    • sentRows: Total number of successfully sent records.
    • unsentRows: Total number of unsent records.
    • sendFailedRows: Total number of failed records.
    • threadStatus: List of writing thread statuses.
    • threadId: Thread ID.
    • sentRows: Number of successfully sent records by this thread.
    • unsentRows: Number of unsent records by this thread.
    • sendFailedRows: Number of failed records by this thread.
  • Status methods:

    • hasError(): true indicates data write errors; false indicates no errors.
    • succeed(): true indicates data write success; false indicates failure.

Examples

The following example gets the status of an MTW object.

MultithreadedTableWriter.Status writeStatus = new MultithreadedTableWriter.Status();
writeStatus = multithreadedTableWriter_.getStatus();

The output of MultithreadedTableWriterStatus is shown below:

errorCode     : A1
errorInfo     : Data conversion error: Cannot convert double to LONG
isExiting     : True
sentRows      : 2493
unsentRows    : 0
sendFailedRows: 7507
threadStatus  :
        threadId        sentRows        unsentRows      sendFailedRows
               0               0                 0                7507
        3567691520           415                 0                   0
        3489658624           831                 0                   0
        3481265920           416                 0                   0
        3472873216           416                 0                   0
        3464480512           415                 0                   0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0102c76d30>

The output shows that an exception occurred during the MTW writing process.

  • The error code is A1, with the exception information "Data conversion error: Cannot convert double to LONG."
  • isExiting=True indicates that the current thread is exiting.
  • sentRows=2493 shows that 2493 records have been successfully written to the DolphinDB server.
  • unsentRows=0 and sendFailedRows=7507 indicate that a total of 7507 records are yet to be successfully written to the DolphinDB server.
  • threadStatus lists the processing status of each background thread. threadId=0 represents the total count for all threads. Typically, the successful write results (sentRows) are shown separately in each thread, as seen in the sentRows column of the output above. The failed write results (unsentRows, sendFailedRows) are concentrated in the threadId=0 row, indicating the total number of failed writes throughout the process.

waitForThreadCompletion

waitForThreadCompletion()

Details

This method puts the MTW into a waiting state, exiting once all background worker threads have completed their tasks. If you call insert or insertUnwrittenData after the execution of waitForThreadCompletion, an error "thread is exiting" will be raised.

Examples

The following example demonstrates how to wait for the Writer to complete writing.

multithreadedTableWriter_.waitForThreadCompletion();

Usage Examples

Example 1

The following example demonstrates writing data to a DFS table using MultithreadedTableWriter and printing error information to the console if an error occurs.

@Test
public void testMultithreadedTableWriter() {
    DBConnection conn= new DBConnection();
    conn.connect(HOST, PORT, "admin", "123456");
    Random random = new Random();
    
    // 1. Create DFS table
    String script =
            "dbName = 'dfs://valuedb3'" +
                    "if (exists(dbName))" +
                    "{" +
                    "dropDatabase(dbName);" +
                    "}" +
                    "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" +
                    "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" +
                    "pt = db.createPartitionedTable(datetest,'pdatetest','id');";
    conn.run(script);
    
    // 2. Create MTW object
    MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
            false, false, null, 10000, 1,
            5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
    ErrorCodeInfo ret;
    
    // 3. Write data
    try {
        // Insert 100 rows
        for (int i = 0; i < 100; ++i) {
            ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), "AAAAAAAB", random.nextInt() % 10000);
        }
    }
    catch (Exception e) {
        // MTW throws an exception
        System.out.println("MTW exit with exception {0}" + e.getMessage());
    }

    // Wait for MTW to complete inserting
    multithreadedTableWriter_.waitForThreadCompletion();
    MultithreadedTableWriter.Status writeStatus = new MultithreadedTableWriter.Status();
    writeStatus = multithreadedTableWriter_.getStatus();
    if (!writeStatus.errorInfo.equals("")) {
        // If an error occurred during writing
        System.out.println("error in writing !");
    }
    System.out.println("writeStatus: {0}\n" + writeStatus.toString());
    System.out.println(((BasicLong)conn.run("exec count(*) from pt")).getLong());
}

Execution result:

writeStatus: {0}
errorCode     : 
errorInfo     : 
isExiting     : true
sentRows      : 100
unsentRows    : 0
sendFailedRows: 0
threadStatus  :
        threadId        sentRows      unsentRows  sendFailedRows
              13              30               0               0
              14              18               0               0
              15              15               0               0
              16              20               0               0
              17              17               0               0

100

The output shows the result of multi-threaded writing. The threadId and sentRows columns will vary based on the actual situation.

Example 2. Using Mode.M_Upsert

@Test(timeout = 120000)
public void test_mtw_tableUpsert_DP_updateFirst() throws Exception {
    String script = "if(existsDatabase(\"dfs://upsert\")) {\n" +
            "dropDatabase(\"dfs://upsert\")\n" +
            "}\n" +
            "sym=\"A\" \"B\" \"C\" \"A\" \"D\" \"B\" \"A\"\n" +
            "date=take(2021.12.10,3) join take(2021.12.09, 3) join 2021.12.10\n" +
            "price=8.3 7.2 3.7 4.5 6.3 8.4 7.6\n" +
            "val=10 19 13 9 19 16 10\n" +
            "t=table(sym, date, price, val)\n" +
            "db=database(\"dfs://upsert\", VALUE,\"A\" \"B\" \"C\")\n" +
            "pt=db.createPartitionedTable(t, `pt, `sym)\n" +
            "pt.append!(t)";
    conn.run(script);
    MultithreadedTableWriter mtw = new MultithreadedTableWriter(HOST,PORT,"admin","123456","dfs://upsert","pt",
            false,false,null,1000,1,10,"sym",null,
            MultithreadedTableWriter.Mode.M_Upsert, 
            new String[]{"ignoreNull=false", "keyColNames=`sym"});
    mtw.insert(new BasicString("A"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(11.1),new BasicInt(12));
    mtw.insert(new BasicString("B"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(10.5),new BasicInt(9));
    mtw.insert(new BasicString("E"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(6.9),new BasicInt(11));
    mtw.waitForThreadCompletion();
    BasicTable ua = (BasicTable) conn.run("select * from pt;");
    System.out.println(ua.getString());
}

Execution result:

sym date       price val
--- ---------- ----- ---
A   2021.12.09 11.1  12 
A   2021.12.09 4.5   9  
A   2021.12.10 7.6   10 
B   2021.12.09 10.5  9  
B   2021.12.09 8.4   16 
C   2021.12.10 3.7   13 
D   2021.12.09 6.3   19 
E   2021.12.09 6.9   11 

Example 3. Defining Callbacks

Implement the callback interface and override the writeCompletion method to retrieve callback data:

Callback callbackHandler = new Callback() {
    public void writeCompletion(Table callbackTable) {
        List<String> failedIdList = new ArrayList<>();
        BasicStringVector idVec = (BasicStringVector) callbackTable.getColumn(0);
        BasicBooleanVector successVec = (BasicBooleanVector) callbackTable.getColumn(1);
        for (int i = 0; i < successVec.rows(); i++) {
            if (!successVec.getBoolean(i)) {
                failedIdList.add(idVec.getString(i));
            }
        }
    }
};

Construct theMultithreadedTableWriter object and pass in the callback object:

MultithreadedTableWriter mtw = new MultithreadedTableWriter(
    host, port, userName, password, dbName, tbName, useSSL,
    enableHighAvailability, null, 10000, 1, 1, "price", callbackHandler
);

Call the insert method of MultithreadedTableWriter and write an ID for each row in the first column:

String theme = "theme1";
for (int id = 0; id < 1000000; id++) {
    // theme + id is the ID for each row, which will be returned during the callback
    mtw.insert(theme + id, code, price);
}

Handling Exception

When using the MultithreadedTableWriter class to call the insert method to insert data, the following exceptions may occur:

Data Type Mismatch

If the data type of the inserted data does not match the column type of the table, MultithreadedTableWriter will immediately return an error message and print the stack trace:

DBConnection conn= new DBConnection();
conn.connect(HOST, PORT, "admin", "123456");
Random random = new Random();
String script =
        "dbName = 'dfs://valuedb3'" +
                "if (exists(dbName))" +
                "{" +
                "dropDatabase(dbName);" +
                "}" +
                "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" +
                "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" +
                "pt = db.createPartitionedTable(datetest,'pdatetest','id');";
conn.run(script);
MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
        false, false, null, 10000, 1,
        5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
ErrorCodeInfo ret;
// Insert one row with incorrect data type, MTW returns an error message immediately
ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), 222, random.nextInt() % 10000);
if (!ret.errorInfo.equals(""))
    System.out.println("insert wrong format data: {2}\n" + ret.toString());

Execution result:

java.lang.RuntimeException: Failed to insert data. Cannot convert int to DT_SYMBOL.
	at com.xxdb.data.BasicEntityFactory.createScalar(BasicEntityFactory.java:795)
	at com.xxdb.data.BasicEntityFactory.createScalar(BasicEntityFactory.java:505)
	at com.xxdb.multithreadedtablewriter.MultithreadedTableWriter.insert(MultithreadedTableWriter.java:594)
	at com.xxdb.BehaviorTest.testMul(BehaviorTest.java:89)
	at com.xxdb.BehaviorTest.main(BehaviorTest.java:168)
insert wrong format data: {2}
code=A1 info=Invalid object error java.lang.RuntimeException: Failed to insert data. Cannot convert int to DT_SYMBOL.

Column Count Mismatch

If the number of columns in the inserted data does not match the number of columns in the table, MultithreadedTableWriter will immediately return an error message:

DBConnection conn= new DBConnection();
conn.connect(HOST, PORT, "admin", "123456");
Random random = new Random();
String script =
        "dbName = 'dfs://valuedb3'" +
                "if (exists(dbName))" +
                "{" +
                "dropDatabase(dbName);" +
                "}" +
                "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" +
                "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" +
                "pt = db.createPartitionedTable(datetest,'pdatetest','id');";
conn.run(script);
MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
        false, false, null, 10000, 1,
        5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
ErrorCodeInfo ret;
// Insert one row of data where the number of columns does not match the table's columns, MTW returns an error message immediately
ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), random.nextInt() % 10000);
if (!ret.errorInfo.equals(""))
    System.out.println("insert wrong format data: {3}\n" + ret.toString());

Execution result:

insert wrong format data: {3}
code=A2 info=Column counts don't match.  

Rewriting Data

When MultithreadedTableWriter encounters errors such as mismatch errors or writing thread failures, the API will terminate all threads. You can use getUnwrittenData() to obtain the unwritten data and then rewrite it with insertUnwrittenData(). Please note that a new MTW object must be created to write the unwritten data.

List<List<Entity>> unwriterdata = new ArrayList<>();
unwriterdata = multithreadedTableWriter_.getUnwrittenData();
System.out.println("{5} unwriterdata: " + unwriterdata.size());
// Retrieve a new MTW object
MultithreadedTableWriter newmultithreadedTableWriter = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
        false, false, null, 10000, 1,
        5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
try
{
    boolean writesuccess = true;
    // Write the unwritten data to the new MTW
    ret = newmultithreadedTableWriter.insertUnwrittenData(unwriterdata);
}
finally
{
    newmultithreadedTableWriter.waitForThreadCompletion();
    writeStatus = newmultithreadedTableWriter.getStatus();
    System.out.println("writeStatus: {6}\n" + writeStatus.toString());
}

Execution result:

  {5} unwriterdata: 10
  writeStatus: {6}
  errorCode     : 
  errorInfo     : 
  isExiting     : true
  sentRows      : 10
  unsentRows    : 0
  sendFailedRows: 0
  threadStatus  :
          threadId        sentRows      unsentRows  sendFailedRows
                23               3               0               0
                24               2               0               0
                25               1               0               0
                26               3               0               0
                27               1               0               0