MultithreadedTableWriter
The Java API provides the MultithreadedTableWriter
class for
asynchronous data writes, maintaining a data buffer queue on the client. When the server
is busy with network I/O, the client write threads can still write data continuously
into the buffer queue maintained by the client. Once the queue receives the task, the
client considers the task completed. Writing to the queue is immediate, thus avoiding
busy waits for the write threads. Currently, MultithreadedTableWriter
supports batch writing to in-memory tables and DFS tables.
Constructor
public MultithreadedTableWriter(String hostName, int port, String userId, String password, String dbName, String tableName, boolean useSSL, boolean enableHighAvailability, String[] highAvailabilitySites, int batchSize, float throttle, int threadCount, String partitionCol, int[] compressTypes, Mode mode, String[] pModeOption, boolean enableActualSendTime, boolean reconnect, int tryReconnectNums)
Parameters
- hostName: Host name of the server to connect to.
- port: Server port number.
- userId / password: Username and password.
- dbPath: A String indicating the DFS database path. Leave it unspecified for an in-memory table.
- tableName: A String indicating the in-memory or DFS table name.Note: For Java API 1.30.17 or lower versions, when writing to an in-memory table, please specify the in-memory table name for dbPath and leave tableName empty.
- useSSL: Whether to enable encrypted communication.
- enableHighAvailability: Whether to enable API high availability.
- highAvailabilitySites: A String array of ip:port of all available nodes.
- batchSize: An int indicating the number of messages in batch processing. The default value is 1, meaning the server processes the data as soon as they are written. If it is greater than 1, only when the number of data reaches batchSize, the client will send the data to the server.
- throttle: A positive floating-point number indicating the waiting time (in seconds) before the server processes the incoming data if the number of data written from the client does not reach batchSize.
- threadCount: An int indicating the number of working threads to be created. The default value is 1, indicating single-threaded process. It must be 1 for a dimension table.
- partitionCol: A String indicating the partitioning column. It is empty by
default, and only takes effect when threadCount is greater than 1.
- For a partitioned table, it must be the partitioning column.
- For a stream table, it must be a column name.
- For a dimension table, the parameter does not take effect.
- compressTypes: An array of the compression methods used for each column.
If unspecified, the columns are not compressed. The compression methods
(case-insensitive) include:
- "Vector.COMPRESS_LZ4": LZ4 algorithm
- "Vector.COMPRESS_DELTA": Delta-of-delta encoding
- mode: The write mode. It can be "M_Upsert" (call server function
upsert!
) or "M_Append" (call server functiontableInsert
). - pModeOption: A String array specifying optional parameters of
upsert!
. Only takes effect when mode = "M_Upsert". - callbackHandler: The callback function that is invoked after successfully inserting a row. The default value is empty. Its parameter is a table where the first column is of String type, representing the ID of a row from the inserted data, and the second column is of boolean type, indicating whether the insertion of that row was successful.
- enableActualSendTime is a Boolean value that specifies whether to record the send time for each message. Note that the last column of tableName must be of NANOTIMESTAMP type.
- reconnect (optional, default false): A Boolean value indicating whether to resubscribe after network disconnection.
- tryReconnectNums: Number of reconnection attempts for each connection. If not specified, reconnection attempts are not limited. If specified, limited attempts are applied in HA and non-HA modes. In HA mode, cycles through each node once per attempt, up to tryReconnectNums cycles.
Methods
Method | Description |
---|---|
insert |
Inserts a single row of data |
getUnwrittenData |
Gets data that has not been written to the server |
insertUnwrittenData |
Inserts data into the data table |
getStatus |
Gets the current running status of the MTW object |
waitForThreadCompletion |
MTW will enter a waiting state |
insert
Syntax
ErrorCodeInfo insert(Object... args)
Parameters
- args: A variable-length argument (varargs) indicating the record to be inserted.
Details
Insert a single record. Return a class ErrorCodeInfo
containing
errorCode and errorInfo. If errorCode is not "",
MultithreadedTableWriter
has failed to insert the data, and
errorInfo displays the error message.
The class ErrorCodeInfo
provides methods
hasError()
and succeed()
to check whether
the data is written properly. hasError()
returns true if an
error occurred, false otherwise. succeed()
returns true if the
data is written successfully, false otherwise.
Examples
MultithreadedTableWriter multithreadedTableWriter = new MultithreadedTableWriter("localhost", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); ErrorCodeInfo pErrorInfo = multithreadedTableWriter_.insert(new Date(2022, 3, 23), "AAAAAAAB", 10000000L);
getUnwrittenData
List<List<Entity>> getUnwrittenData()
Details
Return a nested list representing the data that has not been written to the
server. Note that MultithreadedTableWriter
will release these
data resources after retrieving them.
Examples
List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData();
insertUnwrittenData
ErrorCodeInfo insertUnwrittenData(List<List<Entity>> records)
Parameters
- records: Data that needs to be written again, obtained via the
getUnwrittenData
method.
Details
Insert unwritten data. The result is in the same format as
insert
. The difference is that
insertUnwrittenData
can insert multiple records at a
time.
Examples
List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData(); ErrorCodeInfo ret = multithreadedTableWriter_.insertUnwrittenData(unwrittenData);
getStatus
Status getStatus()
Get the current running status of the MultithreadedTableWriter
object.
Return Value: Status
is a class inherited from
ErrorCodeInfo
.
-
Status
attributes:- isExiting: Whether the writing thread is exiting.
- errorCode: Error code.
- errorInfo: Error information.
- sentRows: Total number of successfully sent records.
- unsentRows: Total number of unsent records.
- sendFailedRows: Total number of failed records.
- threadStatus: List of writing thread statuses.
- threadId: Thread ID.
- sentRows: Number of successfully sent records by this thread.
- unsentRows: Number of unsent records by this thread.
- sendFailedRows: Number of failed records by this thread.
-
Status
methods:hasError()
:true
indicates data write errors;false
indicates no errors.succeed()
:true
indicates data write success;false
indicates failure.
Examples
The following example gets the status of an MTW object.
MultithreadedTableWriter.Status writeStatus = new MultithreadedTableWriter.Status(); writeStatus = multithreadedTableWriter_.getStatus();
The output of MultithreadedTableWriterStatus
is shown below:
errorCode : A1 errorInfo : Data conversion error: Cannot convert double to LONG isExiting : True sentRows : 2493 unsentRows : 0 sendFailedRows: 7507 threadStatus : threadId sentRows unsentRows sendFailedRows 0 0 0 7507 3567691520 415 0 0 3489658624 831 0 0 3481265920 416 0 0 3472873216 416 0 0 3464480512 415 0 0 <dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0102c76d30>
The output shows that an exception occurred during the MTW writing process.
- The error code is A1, with the exception information "Data conversion error: Cannot convert double to LONG."
isExiting=True
indicates that the current thread is exiting.sentRows=2493
shows that 2493 records have been successfully written to the DolphinDB server.unsentRows=0
andsendFailedRows=7507
indicate that a total of 7507 records are yet to be successfully written to the DolphinDB server.threadStatus
lists the processing status of each background thread.threadId=0
represents the total count for all threads. Typically, the successful write results (sentRows
) are shown separately in each thread, as seen in thesentRows
column of the output above. The failed write results (unsentRows
,sendFailedRows
) are concentrated in thethreadId=0
row, indicating the total number of failed writes throughout the process.
waitForThreadCompletion
waitForThreadCompletion()
Details
This method puts the MTW into a waiting state, exiting once all background worker
threads have completed their tasks. If you call insert
or
insertUnwrittenData
after the execution of
waitForThreadCompletion
, an error "thread is exiting" will
be raised.
Examples
The following example demonstrates how to wait for the Writer to complete writing.
multithreadedTableWriter_.waitForThreadCompletion();
Usage Examples
Example 1
The following example demonstrates writing data to a DFS table using
MultithreadedTableWriter
and printing error information to
the console if an error occurs.
@Test public void testMultithreadedTableWriter() { DBConnection conn= new DBConnection(); conn.connect(HOST, PORT, "admin", "123456"); Random random = new Random(); // 1. Create DFS table String script = "dbName = 'dfs://valuedb3'" + "if (exists(dbName))" + "{" + "dropDatabase(dbName);" + "}" + "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" + "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" + "pt = db.createPartitionedTable(datetest,'pdatetest','id');"; conn.run(script); // 2. Create MTW object MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); ErrorCodeInfo ret; // 3. Write data try { // Insert 100 rows for (int i = 0; i < 100; ++i) { ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), "AAAAAAAB", random.nextInt() % 10000); } } catch (Exception e) { // MTW throws an exception System.out.println("MTW exit with exception {0}" + e.getMessage()); } // Wait for MTW to complete inserting multithreadedTableWriter_.waitForThreadCompletion(); MultithreadedTableWriter.Status writeStatus = new MultithreadedTableWriter.Status(); writeStatus = multithreadedTableWriter_.getStatus(); if (!writeStatus.errorInfo.equals("")) { // If an error occurred during writing System.out.println("error in writing !"); } System.out.println("writeStatus: {0}\n" + writeStatus.toString()); System.out.println(((BasicLong)conn.run("exec count(*) from pt")).getLong()); }
Execution result:
writeStatus: {0} errorCode : errorInfo : isExiting : true sentRows : 100 unsentRows : 0 sendFailedRows: 0 threadStatus : threadId sentRows unsentRows sendFailedRows 13 30 0 0 14 18 0 0 15 15 0 0 16 20 0 0 17 17 0 0 100
The output shows the result of multi-threaded writing. The
threadId
and sentRows
columns will vary
based on the actual situation.
Example 2. Using Mode.M_Upsert
@Test(timeout = 120000) public void test_mtw_tableUpsert_DP_updateFirst() throws Exception { String script = "if(existsDatabase(\"dfs://upsert\")) {\n" + "dropDatabase(\"dfs://upsert\")\n" + "}\n" + "sym=\"A\" \"B\" \"C\" \"A\" \"D\" \"B\" \"A\"\n" + "date=take(2021.12.10,3) join take(2021.12.09, 3) join 2021.12.10\n" + "price=8.3 7.2 3.7 4.5 6.3 8.4 7.6\n" + "val=10 19 13 9 19 16 10\n" + "t=table(sym, date, price, val)\n" + "db=database(\"dfs://upsert\", VALUE,\"A\" \"B\" \"C\")\n" + "pt=db.createPartitionedTable(t, `pt, `sym)\n" + "pt.append!(t)"; conn.run(script); MultithreadedTableWriter mtw = new MultithreadedTableWriter(HOST,PORT,"admin","123456","dfs://upsert","pt", false,false,null,1000,1,10,"sym",null, MultithreadedTableWriter.Mode.M_Upsert, new String[]{"ignoreNull=false", "keyColNames=`sym"}); mtw.insert(new BasicString("A"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(11.1),new BasicInt(12)); mtw.insert(new BasicString("B"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(10.5),new BasicInt(9)); mtw.insert(new BasicString("E"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(6.9),new BasicInt(11)); mtw.waitForThreadCompletion(); BasicTable ua = (BasicTable) conn.run("select * from pt;"); System.out.println(ua.getString()); }
Execution result:
sym date price val --- ---------- ----- --- A 2021.12.09 11.1 12 A 2021.12.09 4.5 9 A 2021.12.10 7.6 10 B 2021.12.09 10.5 9 B 2021.12.09 8.4 16 C 2021.12.10 3.7 13 D 2021.12.09 6.3 19 E 2021.12.09 6.9 11
Example 3. Defining Callbacks
Implement the callback interface and override the
writeCompletion
method to retrieve callback data:
Callback callbackHandler = new Callback() { public void writeCompletion(Table callbackTable) { List<String> failedIdList = new ArrayList<>(); BasicStringVector idVec = (BasicStringVector) callbackTable.getColumn(0); BasicBooleanVector successVec = (BasicBooleanVector) callbackTable.getColumn(1); for (int i = 0; i < successVec.rows(); i++) { if (!successVec.getBoolean(i)) { failedIdList.add(idVec.getString(i)); } } } };
Construct theMultithreadedTableWriter
object and pass in the
callback object:
MultithreadedTableWriter mtw = new MultithreadedTableWriter( host, port, userName, password, dbName, tbName, useSSL, enableHighAvailability, null, 10000, 1, 1, "price", callbackHandler );
Call the insert
method of
MultithreadedTableWriter
and write an ID for each row in
the first column:
String theme = "theme1"; for (int id = 0; id < 1000000; id++) { // theme + id is the ID for each row, which will be returned during the callback mtw.insert(theme + id, code, price); }
Handling Exception
When using the MultithreadedTableWriter
class to call the
insert
method to insert data, the following exceptions may
occur:
Data Type Mismatch
If the data type of the inserted data does not match the column type of the
table, MultithreadedTableWriter
will immediately return an
error message and print the stack trace:
DBConnection conn= new DBConnection(); conn.connect(HOST, PORT, "admin", "123456"); Random random = new Random(); String script = "dbName = 'dfs://valuedb3'" + "if (exists(dbName))" + "{" + "dropDatabase(dbName);" + "}" + "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" + "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" + "pt = db.createPartitionedTable(datetest,'pdatetest','id');"; conn.run(script); MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); ErrorCodeInfo ret; // Insert one row with incorrect data type, MTW returns an error message immediately ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), 222, random.nextInt() % 10000); if (!ret.errorInfo.equals("")) System.out.println("insert wrong format data: {2}\n" + ret.toString());
Execution result:
java.lang.RuntimeException: Failed to insert data. Cannot convert int to DT_SYMBOL. at com.xxdb.data.BasicEntityFactory.createScalar(BasicEntityFactory.java:795) at com.xxdb.data.BasicEntityFactory.createScalar(BasicEntityFactory.java:505) at com.xxdb.multithreadedtablewriter.MultithreadedTableWriter.insert(MultithreadedTableWriter.java:594) at com.xxdb.BehaviorTest.testMul(BehaviorTest.java:89) at com.xxdb.BehaviorTest.main(BehaviorTest.java:168) insert wrong format data: {2} code=A1 info=Invalid object error java.lang.RuntimeException: Failed to insert data. Cannot convert int to DT_SYMBOL.
Column Count Mismatch
If the number of columns in the inserted data does not match the number of
columns in the table, MultithreadedTableWriter
will immediately
return an error message:
DBConnection conn= new DBConnection(); conn.connect(HOST, PORT, "admin", "123456"); Random random = new Random(); String script = "dbName = 'dfs://valuedb3'" + "if (exists(dbName))" + "{" + "dropDatabase(dbName);" + "}" + "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" + "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" + "pt = db.createPartitionedTable(datetest,'pdatetest','id');"; conn.run(script); MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); ErrorCodeInfo ret; // Insert one row of data where the number of columns does not match the table's columns, MTW returns an error message immediately ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), random.nextInt() % 10000); if (!ret.errorInfo.equals("")) System.out.println("insert wrong format data: {3}\n" + ret.toString());
Execution result:
insert wrong format data: {3} code=A2 info=Column counts don't match.
Rewriting Data
When MultithreadedTableWriter
encounters errors such as mismatch
errors or writing thread failures, the API will terminate all threads. You can
use getUnwrittenData()
to obtain the unwritten data and then
rewrite it with insertUnwrittenData()
. Please note that a new
MTW object must be created to write the unwritten data.
List<List<Entity>> unwriterdata = new ArrayList<>(); unwriterdata = multithreadedTableWriter_.getUnwrittenData(); System.out.println("{5} unwriterdata: " + unwriterdata.size()); // Retrieve a new MTW object MultithreadedTableWriter newmultithreadedTableWriter = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); try { boolean writesuccess = true; // Write the unwritten data to the new MTW ret = newmultithreadedTableWriter.insertUnwrittenData(unwriterdata); } finally { newmultithreadedTableWriter.waitForThreadCompletion(); writeStatus = newmultithreadedTableWriter.getStatus(); System.out.println("writeStatus: {6}\n" + writeStatus.toString()); }
Execution result:
{5} unwriterdata: 10 writeStatus: {6} errorCode : errorInfo : isExiting : true sentRows : 10 unsentRows : 0 sendFailedRows: 0 threadStatus : threadId sentRows unsentRows sendFailedRows 23 3 0 0 24 2 0 0 25 1 0 0 26 3 0 0 27 1 0 0