Batch Job Management

The execution of some jobs may be very time consuming. DolphinDB provides a scheme to run these jobs (called batch jobs) in a separate worker pool that is isolated from regular interactive jobs such as adhoc queries. The maximum number of batch job workers is set by the configuration parameter maxBatchJobWorker. If the number of batch jobs exceeds the limit, the incoming batch jobs will enter a queue to wait. The batch job worker will be destroyed automatically after being idle for over 60 seconds.

To manage the output of batch jobs, DolphinDB creates a directory specified by the configuration parameter batchJobDir. If it is not specified, the default directory would be <HomeDir>/batchJobs. Each batch job generates 2 files: <job_id>.msg and <job_id>.obj that store intermediate messages and return objects, respectively. In addition, each batch job adds 3 entries to the batch job log <BatchJobDir>/batchJob.log when the system receives, starts, and completes the batch job.

Examples:

  • submit a job to the local node:

def job1(n){
    s = 0
    for (x in 1 : n) {
        s += sum(sin rand(1.0, 100000000)-0.5)
        print("iteration " + x + " " + s)
    }
    return s
};
job1_ID=submitJob("job1_ID","", job1, 100);

getJobStatus(job1_ID);
node userID jobId jobDesc receivedTime startTime endTime errorMsg
local8848 root job1_ID job1 2018.06.16T10:44:34.066 2018.06.16T10:44:34.066

endTime is empty. This means the job is still running. After the job is completed, rerun getJobStatus :

getJobStatus(job1_ID);
node userID jobId jobDesc receivedTime startTime endTime errorMsg
local8848 root job1_ID job1 2018.06.16T10:44:34.066 2018.06.16T10:44:34.066 2018.06.16T10:46:10.389
getJobMessage(job1_ID);
2018-06-16 10:44:34.066064 Start the job [job1_ID]: job1
2018-06-16 10:44:35.377095 iteration 1 1412.431451
2018-06-16 10:44:36.624907 iteration 2 2328.917258
2018-06-16 10:44:37.577107 iteration 3 5491.651822
2018-06-16 10:44:38.530187 iteration 4 6332.907608
2018-06-16 10:44:39.488295 iteration 5 8404.393113
......
2018-06-16 10:46:06.655519 iteration 95 -13124.624482
2018-06-16 10:46:07.562855 iteration 96 -14878.269863
2018-06-16 10:46:08.520555 iteration 97 -9842.451424
2018-06-16 10:46:09.456576 iteration 98 -8149.660675
2018-06-16 10:46:10.373218 iteration 99 -10639.329897
2018-06-16 10:46:10.389147 The job is done.

getJobReturn(job1_ID);
-4291.91147
  • submit a job to a remote node:

With function rpc ("DFS_NODE2" is located in the same cluster as the local node):

def jobDemo(n){
    s = 0
    for (x in 1 : n) {
        s += sum(sin rand(1.0, 100000000)-0.5)
        print("iteration " + x + " " + s)
    }
    return s
};

rpc("DFS_NODE2", submitJob, "job1_1", "", job1{10});

rpc("DFS_NODE2", getJobReturn, "jobDemo3")
//Output
-3426.577521

With function remoteRun:

conn = xdb("DFS_NODE2")
conn.remoteRun(submitJob, "job1_2", "", job1, 10);

conn.remoteRun(getJobReturn, "job1_2");
//Output
4238.832005