Executing Scripts

run Method

In the Java API, when multiple threads use the same connection to operate on the same table, unexpected results may occur due to indeterminate thread execution order. For example, if thread A is supposed to update a table and then query it, and thread B is supposed to delete the table after thread A's operations, the order of execution without locks is unknown. Thread A may fail the query if thread B deletes the table in between. The run method adopts Reentrant Lock to avoid such conflicts.

Syntax

public Entity run(String script, ProgressListener listener, int priority, int 
parallelism, int fetchSize, boolean clearSessionMemory, String tableName, boolean enableSeqNo)

Parameters

  • script: The script to be executed.

  • listener (optional, default null): The listener that can be used during script execution. The results returned by the server can be received by listener.

  • priority (optional, default 4): An int in [0,9] specifying the job priority. A larger value indicates a higher task priority.

  • parallelism (optional, default 64): An int specifying the job parallelism, i.e., the number of threads allocated for executing subtasks on a data node. Since version 3.00.1.2, the default value has been modified from 2 to 64. If the maximum value per user is also set with setMaxJobParallelism(userId, maxParallelism) on the server side, the parallelism of an API job will be the minimum of parallelism and maxParallelism.

  • fetchSize (optional, default 0): An int specifying the block size ( (number of records). Its value cannot be less than 8192. When querying large tables, setting fetchSize can reduce the client memory usage.

  • clearSessionMemory (optional, default false): Whether to clear the session memory after execution. Set to true to release the memory resources occupied by variables created by the run method upon completion.

  • tableName (optional, default ""): A String specifying the table whose schema is obtained during execution.

  • enableSeqNo (optional, default 0): The sequence number of this execution. The server determines whether a task has been executed or not to avoid repeated executions. It is mainly used for data idempotency validation.

    Before version 2.00.11.0, the run method automatically enables sequence number. The number is a LONG integer that represents the task sequence number for a client. If a write task fails, the task will be resubmitted. However, in cases like writing multiple tables at once, data loss may occur. This parameter is introduced since version 2.00.11.0 to enable/disable the sequence number feature.

Examples

Example 1. Execute a script:

@Test
public void test_run_script() throws IOException {
    DBConnection dbConnection = new DBConnection();
    dbConnection.connect("192.168.0.68", 8848);
    Entity entiry = dbConnection.run("x = 1; x;");
    System.out.println(entiry.getString());
}
// output: 1

Example 2. Execute a function:

public Entity run(String function, List<Entity> arguments)

Use the run method to call function tableInsert().

public void test_run_function_tableInsert() throws Exception {
    DBConnection dbConnection = new DBConnection();
    dbConnection.connect("192.168.0.68", 8848);
    int n=5000000;
    List<String> colNames = new ArrayList<>();
    colNames.add("date");
    colNames.add("val");
    int[] time = new int[n*2];
    double[] val = new double[n*2];

    int baseTime = Utils.countDays(2000,1,1);
    for (int i = 0; i < n*2; i++) {
        time[i] = baseTime + (i % 15);
        val[i] = rand.nextDouble();
    }
    List<Vector> colVectors = new ArrayList<>();
    BasicDateVector dateVector = new BasicDateVector(time);
    colVectors.add(dateVector);
    dateVector.setCompressedMethod(1);
    BasicDoubleVector valVector = new BasicDoubleVector(val);
    valVector.setCompressedMethod(2);
    colVectors.add(valVector);

    BasicTable table = new BasicTable(colNames, colVectors);

    List<Entity> args = Arrays.asList(table);
    conn.run("t = table(100000:0,`date`val,[DATE,DOUBLE])" +
            "share t as st");
            
    // Call tableInsert
    conn.run("tableInsert{st}", args);
}

Example 3. Set tableName to obtain table schema:

@Test
public void testRun() throws IOException {
    DBConnection dbConnection = new DBConnection();
    dbConnection.connect("192.168.0.68", 8848);
    dbConnection.run("ts = table(1..100 as a,take(`a`b`c,100) as b)");
    BasicTableSchema tableSchema = (BasicTableSchema) dbConnection.run("select * from ts", "ts");
    System.out.println(tableSchema.getString());
}

Execution result:

cols :2
rows :100
name            typeString      typeInt         colIndex        
a               DT_INT          4               0               
b               DT_STRING       18              1  

Example 4. Set clearSessionMemory to true to reduce memory usage during execution:

@Test
public void testRun3() throws IOException {
    DBConnection dbConnection = new DBConnection();
    dbConnection.connect("192.168.0.68", 8848);
    dbConnection.run("x=1;", 4, true);
    Entity entity = dbConnection.run("x;");
    System.out.println(entity.getString());
}
// output: java.io.IOException: 192.168.0.68:8848 Server response: 'Syntax Error: [line #1] Cannot recognize the token x' script: 'x;'

Since the variable x is cleared after the first run, calling run("x") raises an exception.

Example 5. Set fetchSize for querying large tables.

In the following example, a table with a data size of 22486 is created in the Java client, and fetchSize is set to 10000 to retrieve the data in blocks. The run method uses the fetchSize parameter to specify the block size, returning an EntityBlockReader object, which allows for reading data by segment using the read() method.

@Test
public void TestRun() throws IOException {
    DBConnection dbConnection = new DBConnection();
    dbConnection.connect("192.168.0.68", 8848);
    EntityBlockReader blockReader = (EntityBlockReader) dbConnection.run("table(1..22486 as id)", (ProgressListener) null, 4, 4, 10000);
    while (blockReader.hasNext()) {
        System.out.println(data.rows());
        BasicTable t = (BasicTable) blockReader.read();
        // 对读出来的数据进行处理
        // ...
    }
}

Execution result:

10000
20000
2486

When using the segmented read method, if the data has not been fully read, the skipAll() method must be called to skip the remaining data before continuing with the subsequent code. Otherwise, data will remain in the socket buffer, causing subsequent data deserialization to fail. For example:

@Test
public void testFetchDataInt() throws IOException {
    DBConnection dbConnection = new DBConnection();
    dbConnection.connect("192.168.0.68", 8848);
    EntityBlockReader blockReader = (EntityBlockReader) dbConnection.run("table(1..22486 as id)", (ProgressListener) null, 4, 4, 10000);
    // Read 10000 records
    BasicTable data = (BasicTable) blockReader.read();

    // Skip remaining data
    blockReader.skipAll();
    // Then follow-up code could run. Otherwise would throw exception.
    dbConnection.run("1==1;");
}

Example 6. Set a listener during script execution. When the script is executed, the server returns user-printed information.

@Test
public void testRun5() throws IOException{
    DBConnection dbConnection = new DBConnection();
    dbConnection.connect("192.168.1.167", 18921);

    ProgressListener listener = new ProgressListener() {
        @Override
        public void progress(String message) {
            System.out.println("message: \n" + message);
        }
    };
    Entity entity = dbConnection.run("t = table(1..5 as id, rand(100, 5) as price); print t; select * from t where id=4;", listener);
    System.out.println("select one record: \n" + entity.getString());
}

Execution result:

Connect to 192.168.1.167:18921.
message: 
id price
-- -----
1  9    
2  11   
3  81   
4  78   
5  1    

select one record: 
id price
-- -----
4  78   

tryRun Method

In addition, the Java API provides the tryRun method to perform a "try-to-run" operation.

This method allows users to attempt to execute a DolphinDB script. The difference is that the run method blocks and waits for the lock to be released before executing the script, whereas the tryRun method returns null if the lock is occupied by another thread instead of waiting for the lock to be released. See the tryLock method of Reentrant Lock for reference.

Syntax

public Entity tryRun(String script, int priority, int parallelism,int fetchSize, boolean clearSessionMemory)

The parameters share the same meaning and function as in the run method.

Examples

@Test
public void testTryRun() throws IOException{
    DBConnection dbConnection = new DBConnection();
    dbConnection.connect("192.168.0.68", 8848);
    Entity entity = dbConnection.tryRun("x=1; x;", 1, 1, true);
    System.out.println(entity.getString());
}

Additionally, the tryRun method also supports passing a function name and arguments to execute:

public Entity tryRun(String function, List<Entity> arguments, int priority, int parallelism, int fetchSize)

For example, call the sum method to calculate the sum of all elements in the vector x:

@Test
public void testTryRun2() throws IOException{
    DBConnection dbConnection = new DBConnection();
    dbConnection.connect("192.168.0.68", 8848);
    List<Entity> arguments = new ArrayList<>();
    dbConnection.run("x=1..10000000");
    arguments.add(dbConnection.run("x;"));
    Entity entity = dbConnection.tryRun("sum", arguments, 1, 1);
    System.out.println(entity.getString());
}
// Output: 50000005000000

run v.s. tryRun

@Test
public void testTryRun3() throws IOException, InterruptedException {
    DBConnection dbConnection = new DBConnection();
    dbConnection.connect("192.168.0.68", 8848, "admin", "123456");

    // thread1: Execute a table query operation
    Thread thread1 = new Thread(() -> {
        try {
            dbConnection.run("pt1=loadTable(\"dfs://test\", \"tb1\"); sleep(2000);");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    });

    // thread2: Execute a select 1 operation using the tryRun method
    Thread thread2 = new Thread(() -> {
        try {
            Entity entity = dbConnection.tryRun("x=1; x;");
            System.out.println(entity);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    });

    thread1.start();
    thread1.sleep(1);
    thread2.start();
}

Execution result: null

When thread 1 and thread 2 execute simultaneously, thread 2 does not receive the script execution result but returns a null value instead. This indicates that the ReentrantLock for the connection is occupied by thread 1, and thread 2 did not wait during the tryRun method execution but returned null.

If thread 2 executes alone, with no other threads competing for the lock, the execution result will be as follows (note that the result may vary, but it should not be null):

com.xxdb.data.BasicAnyVector@155967e6
If the task of thread 2 is executed with run method, the thread B will block and wait for the thread A, and the execution result as follows can be obtained:
com.xxdb.data.BasicAnyVector@22b38b67