pnodeRun

Syntax

pnodeRun(function,[nodes],[addNodeAlias=true])

Arguments

function is the local function to call. It must not be quoted. It can be a function with no parameters by definition, or a partial application that wraps the orginal function and its parameters to a function with no parameters. It can be a built-in function or a user-defined function.

nodes is aliases of nodes. It is an optional parameter. If it is not specified, the system will call the function on all live data nodes and compute nodes in the cluster.

addNodeAlias specifies whether to add aliases of nodes to results. It is an optional parameter. The default value is true. You can set it to false if the result from each node already contains the node alias.

Details

Call a local function on all data nodes and compute nodes in a cluster in parallel and then merge the results.

Examples

Ex. 1 Call function getChunksMeta without specifying parameters

pnodeRun(getChunksMeta,,false);
site chunkId path dfsPath type flag size version state versionList
local8848 bd13090e-7177-01a7-4ac4-840e1b977dcf D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190605/GOOG /compo/20190605/GOOG 1 0 0 1 0 cid : 40,pt2=>40:6729; #
local8848 b4935730-6372-b2a1-4f24-6c323037e576 e:data/CHUNKS/compo/20190605/AAPL /compo/20190605/AAPL 1 0 0 1 0 cid : 40,pt2=>40:6613; #
local8848 f8ee72c9-dad3-f49e-430e-5ddb3c61ae18 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190604/MSFT /compo/20190604/MSFT 1 0 0 1 0 cid : 40,pt2=>40:6664; #
local8848 08e26b5a-dfac-799f-4979-0dd3902eae6e D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190604/GOOG /compo/20190604/GOOG 1 0 0 1 0 cid : 40,pt2=>40:6635; #
local8848 f9e53a3d-af3e-018d-4bfa-a2b4980f3561 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190604/AAPL /compo/20190604/AAPL 1 0 0 1 0 cid : 40,pt2=>40:6783; #
local8848 417e49e9-5c61-cf9e-4b21-4b35f8e57273 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190601/MSFT /compo/20190601/MSFT 1 0 0 1 0 cid : 40,pt2=>40:6602; #
local8848 3ee64942-1d72-bea7-4bc1-f720132d9288 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190602/AAPL /compo/20190602/AAPL 1 0 0 1 0 cid : 40,pt2=>40:6749; #

Ex. 2 In the following example, the function sum and arguments 1..10 are wrapped into a partial application sum{1..10}.

pnodeRun(sum{1..10}, `nodeA`nodeB);
Node Value
DFS_NODE2 55
DFS_NODE3 55

Ex. 3 pnodeRun is a very convenient tool for cluster management. For example, in a cluster of 4 nodes: "DFS_NODE1", "DFS_NODE2", "DFS_NODE3", and "DFS_NODE4", run the following script on each of the node:

def jobDemo(n){
    s = 0
    for (x in 1 : n) {
        s += sum(sin rand(1.0, 100000000)-0.5)
        print("iteration " + x + " " + s)
    }
    return s
};

submitJob("jobDemo1","job demo", jobDemo, 10);
submitJob("jobDemo2","job demo", jobDemo, 10);
submitJob("jobDemo3","job demo", jobDemo, 10);

To check the status of the most recent 2 completed batch jobs on each of the 4 nodes in the cluster:

pnodeRun(getRecentJobs{2});
Node UserID JobID JobDesc ReceivedTime StartTime EndTime ErrorMsg
DFS_NODE1 root jobDemo2 job demo 2017.11.16T13:04:38.841 2017.11.16T13:04:38.841 2017.11.16T13:04:51.660
DFS_NODE1 root jobDemo3 job demo 2017.11.16T13:04:38.841 2017.11.16T13:04:38.843 2017.11.16T13:04:51.447
DFS_NODE2 root jobDemo2 job demo 2017.11.16T13:04:56.431 2017.11.16T13:04:56.432 2017.11.16T13:05:11.992
DFS_NODE2 root jobDemo3 job demo 2017.11.16T13:04:56.432 2017.11.16T13:04:56.434 2017.11.16T13:05:11.670
DFS_NODE3 root jobDemo2 job demo 2017.11.16T13:05:08.418 2017.11.16T13:05:08.419 2017.11.16T13:05:29.176
DFS_NODE3 root jobDemo3 job demo 2017.11.16T13:05:08.419 2017.11.16T13:05:08.421 2017.11.16T13:05:29.435
DFS_NODE4 root jobDemo2 job demo 2017.11.16T13:05:16.324 2017.11.16T13:05:16.325 2017.11.16T13:05:34.729
DFS_NODE4 root jobDemo3 job demo 2017.11.16T13:05:16.325 2017.11.16T13:05:16.328 2017.11.16T13:05:34.716
pnodeRun(getRecentJobs{2}, `DFS_NODE3`DFS_NODE4);
Node UserID JobID JobDesc ReceivedTime StartTime EndTime ErrorMsg
DFS_NODE3 root jobDemo2 job demo 2017.11.16T13:05:08.418 2017.11.16T13:05:08.419 2017.11.16T13:05:29.176
DFS_NODE3 root jobDemo3 job demo 2017.11.16T13:05:08.419 2017.11.16T13:05:08.421 2017.11.16T13:05:29.435
DFS_NODE4 root jobDemo2 job demo 2017.11.16T13:05:16.324 2017.11.16T13:05:16.325 2017.11.16T13:05:34.729
DFS_NODE4 root jobDemo3 job demo 2017.11.16T13:05:16.325 2017.11.16T13:05:16.328 2017.11.16T13:05:34.716
How does pnodeRun merge the results from multiple nodes:
  1. If function returns a scalar:

    Return a table with 2 columns: node alias and function results.

    Continuing with the example above:
    pnodeRun(getJobReturn{`jobDemo1});
    Node Value
    DFS_NODE3 2,123.5508
    DFS_NODE2 (42,883.5404)
    DFS_NODE1 3,337.4107
    DFS_NODE4 (2,267.3681)
  2. If function returns a vector:

    Return a matrix. Each column of the matrix would be the function returns from nodes. The column label of the matrix would be the nodes.

  3. If function returns a key-value dictionary:

    Return a table with each row representing the function return from one node.

  4. If function returns a table:

    Return a table which is the union of individual tables from multiple nodes.

  5. If function is a command (a command returns nothing):

    Return nothing

  6. For all other cases:

    Return a dictionary. The key is node alias and the value is the function return.