Executing Scripts
run
Method
In the Java API, when multiple threads use the same connection to operate on the same
table, unexpected results may occur due to indeterminate thread execution order. For
example, if thread A is supposed to update a table and then query it, and thread B
is supposed to delete the table after thread A's operations, the order of execution
without locks is unknown. Thread A may fail the query if thread B deletes the table
in between. The run
method adopts Reentrant Lock to avoid such
conflicts.
Syntax
public Entity run(String script, ProgressListener listener, int priority, int parallelism, int fetchSize, boolean clearSessionMemory, String tableName, boolean enableSeqNo)
Parameters
-
script: The script to be executed.
-
listener (optional, default null): The listener that can be used during script execution. The results returned by the server can be received by listener.
-
priority (optional, default 4): An int in [0,9] specifying the job priority. A larger value indicates a higher task priority.
-
parallelism (optional, default 64): An int specifying the job parallelism, i.e., the number of threads allocated for executing subtasks on a data node. Since version 3.00.1.2, the default value has been modified from 2 to 64. If the maximum value per user is also set with
setMaxJobParallelism(userId, maxParallelism)
on the server side, the parallelism of an API job will be the minimum of parallelism and maxParallelism. -
fetchSize (optional, default 0): An int specifying the block size ( (number of records). Its value cannot be less than 8192. When querying large tables, setting fetchSize can reduce the client memory usage.
-
clearSessionMemory (optional, default false): Whether to clear the session memory after execution. Set to true to release the memory resources occupied by variables created by the
run
method upon completion. -
tableName (optional, default ""): A String specifying the table whose schema is obtained during execution.
-
enableSeqNo (optional, default 0): The sequence number of this execution. The server determines whether a task has been executed or not to avoid repeated executions. It is mainly used for data idempotency validation.
Before version 2.00.11.0, the
run
method automatically enables sequence number. The number is a LONG integer that represents the task sequence number for a client. If a write task fails, the task will be resubmitted. However, in cases like writing multiple tables at once, data loss may occur. This parameter is introduced since version 2.00.11.0 to enable/disable the sequence number feature.
Examples
Example 1. Execute a script:
@Test public void test_run_script() throws IOException { DBConnection dbConnection = new DBConnection(); dbConnection.connect("192.168.0.68", 8848); Entity entiry = dbConnection.run("x = 1; x;"); System.out.println(entiry.getString()); } // output: 1
Example 2. Execute a function:
public Entity run(String function, List<Entity> arguments)
Use the run
method to call function
tableInsert()
.
public void test_run_function_tableInsert() throws Exception { DBConnection dbConnection = new DBConnection(); dbConnection.connect("192.168.0.68", 8848); int n=5000000; List<String> colNames = new ArrayList<>(); colNames.add("date"); colNames.add("val"); int[] time = new int[n*2]; double[] val = new double[n*2]; int baseTime = Utils.countDays(2000,1,1); for (int i = 0; i < n*2; i++) { time[i] = baseTime + (i % 15); val[i] = rand.nextDouble(); } List<Vector> colVectors = new ArrayList<>(); BasicDateVector dateVector = new BasicDateVector(time); colVectors.add(dateVector); dateVector.setCompressedMethod(1); BasicDoubleVector valVector = new BasicDoubleVector(val); valVector.setCompressedMethod(2); colVectors.add(valVector); BasicTable table = new BasicTable(colNames, colVectors); List<Entity> args = Arrays.asList(table); conn.run("t = table(100000:0,`date`val,[DATE,DOUBLE])" + "share t as st"); // Call tableInsert conn.run("tableInsert{st}", args); }
Example 3. Set tableName to obtain table schema:
@Test public void testRun() throws IOException { DBConnection dbConnection = new DBConnection(); dbConnection.connect("192.168.0.68", 8848); dbConnection.run("ts = table(1..100 as a,take(`a`b`c,100) as b)"); BasicTableSchema tableSchema = (BasicTableSchema) dbConnection.run("select * from ts", "ts"); System.out.println(tableSchema.getString()); }
Execution result:
cols :2 rows :100 name typeString typeInt colIndex a DT_INT 4 0 b DT_STRING 18 1
Example 4. Set clearSessionMemory to true to reduce memory usage during execution:
@Test public void testRun3() throws IOException { DBConnection dbConnection = new DBConnection(); dbConnection.connect("192.168.0.68", 8848); dbConnection.run("x=1;", 4, true); Entity entity = dbConnection.run("x;"); System.out.println(entity.getString()); } // output: java.io.IOException: 192.168.0.68:8848 Server response: 'Syntax Error: [line #1] Cannot recognize the token x' script: 'x;'
Since the variable x is cleared after the first run
,
calling run("x")
raises an exception.
Example 5. Set fetchSize for querying large tables.
In the following example, a table with a data size of 22486 is created in the
Java client, and fetchSize is set to 10000 to retrieve the data in
blocks. The run
method uses the fetchSize parameter to
specify the block size, returning an EntityBlockReader
object,
which allows for reading data by segment using the read()
method.
@Test public void TestRun() throws IOException { DBConnection dbConnection = new DBConnection(); dbConnection.connect("192.168.0.68", 8848); EntityBlockReader blockReader = (EntityBlockReader) dbConnection.run("table(1..22486 as id)", (ProgressListener) null, 4, 4, 10000); while (blockReader.hasNext()) { System.out.println(data.rows()); BasicTable t = (BasicTable) blockReader.read(); // 对读出来的数据进行处理 // ... } }
Execution result:
10000
20000
2486
When using the segmented read
method, if the data has not been
fully read, the skipAll()
method must be called to skip the
remaining data before continuing with the subsequent code. Otherwise, data will
remain in the socket buffer, causing subsequent data deserialization to fail.
For example:
@Test public void testFetchDataInt() throws IOException { DBConnection dbConnection = new DBConnection(); dbConnection.connect("192.168.0.68", 8848); EntityBlockReader blockReader = (EntityBlockReader) dbConnection.run("table(1..22486 as id)", (ProgressListener) null, 4, 4, 10000); // Read 10000 records BasicTable data = (BasicTable) blockReader.read(); // Skip remaining data blockReader.skipAll(); // Then follow-up code could run. Otherwise would throw exception. dbConnection.run("1==1;"); }
Example 6. Set a listener during script execution. When the script is executed, the server returns user-printed information.
@Test public void testRun5() throws IOException{ DBConnection dbConnection = new DBConnection(); dbConnection.connect("192.168.1.167", 18921); ProgressListener listener = new ProgressListener() { @Override public void progress(String message) { System.out.println("message: \n" + message); } }; Entity entity = dbConnection.run("t = table(1..5 as id, rand(100, 5) as price); print t; select * from t where id=4;", listener); System.out.println("select one record: \n" + entity.getString()); }
Execution result:
Connect to 192.168.1.167:18921. message: id price -- ----- 1 9 2 11 3 81 4 78 5 1 select one record: id price -- ----- 4 78
tryRun
Method
In addition, the Java API provides the tryRun
method to perform a
"try-to-run" operation.
This method allows users to attempt to execute a DolphinDB script. The difference is
that the run
method blocks and waits for the lock to be released
before executing the script, whereas the tryRun
method returns null
if the lock is occupied by another thread instead of waiting for the lock to be
released. See the tryLock method of Reentrant Lock for
reference.
Syntax
public Entity tryRun(String script, int priority, int parallelism,int fetchSize, boolean clearSessionMemory)
The parameters share the same meaning and function as in the run
method.
Examples
@Test public void testTryRun() throws IOException{ DBConnection dbConnection = new DBConnection(); dbConnection.connect("192.168.0.68", 8848); Entity entity = dbConnection.tryRun("x=1; x;", 1, 1, true); System.out.println(entity.getString()); }
Additionally, the tryRun
method also supports passing a function
name and arguments to execute:
public Entity tryRun(String function, List<Entity> arguments, int priority, int parallelism, int fetchSize)
For example, call the sum
method to calculate the sum of all
elements in the vector x:
@Test public void testTryRun2() throws IOException{ DBConnection dbConnection = new DBConnection(); dbConnection.connect("192.168.0.68", 8848); List<Entity> arguments = new ArrayList<>(); dbConnection.run("x=1..10000000"); arguments.add(dbConnection.run("x;")); Entity entity = dbConnection.tryRun("sum", arguments, 1, 1); System.out.println(entity.getString()); } // Output: 50000005000000
run
v.s. tryRun
@Test public void testTryRun3() throws IOException, InterruptedException { DBConnection dbConnection = new DBConnection(); dbConnection.connect("192.168.0.68", 8848, "admin", "123456"); // thread1: Execute a table query operation Thread thread1 = new Thread(() -> { try { dbConnection.run("pt1=loadTable(\"dfs://test\", \"tb1\"); sleep(2000);"); } catch (IOException e) { throw new RuntimeException(e); } }); // thread2: Execute a select 1 operation using the tryRun method Thread thread2 = new Thread(() -> { try { Entity entity = dbConnection.tryRun("x=1; x;"); System.out.println(entity); } catch (IOException e) { throw new RuntimeException(e); } }); thread1.start(); thread1.sleep(1); thread2.start(); }
Execution result: null
When thread 1 and thread 2 execute simultaneously, thread 2 does not receive the
script execution result but returns a null value instead. This indicates that the
ReentrantLock
for the connection is occupied by thread 1, and
thread 2 did not wait during the tryRun
method execution but
returned null
.
If thread 2 executes alone, with no other threads competing for the lock, the execution result will be as follows (note that the result may vary, but it should not be null):
com.xxdb.data.BasicAnyVector@155967e6
run
method, the thread B
will block and wait for the thread A, and the execution result as follows can be
obtained:com.xxdb.data.BasicAnyVector@22b38b67