pnodeRun

Syntax

pnodeRun(function,[nodes],[addNodeAlias=true])

Details

Call a local function on specified nodes in a cluster in parallel and then merge the results.

  • If nodes is specified, the function is called on specified nodes.
  • If nodes is not specified,
    • When pnodeRun is called on a compute node within a compute group, function is executed on all compute nodes within the group.
    • Otherwise it is executed on all data nodes and and compute nodes which are not included in any compute groups.

Parameters

function is the local function to call. It must not be quoted. It can be a function with no parameters by definition, or a partial application that wraps the orginal function and its parameters to a function with no parameters. It can be a built-in function or a user-defined function.

nodes (optional) is a STRING scalar or vector indicating node alias(es). If not specified, the system will call the function on all active data nodes and compute nodes in the cluster.

addNodeAlias (optional) is a Boolean value specifying whether to add node aliases in results. The default value is true. Set to false if node results already contain aliases.

Returns

  • If the called function returns a scalar, dictionary, or table, a table is returned.

  • If the called function returns a vector, a matrix is returned.

  • If the called function returns a dictionary, a table is returned.

  • If the called function is a command (with no return value), null is returned.

  • In other cases, a dictionary is returned.

Examples

Ex. 1 Call function getChunksMeta without specifying parameters

pnodeRun(getChunksMeta,,false);
site chunkId path dfsPath type flag size version state versionList
local8848 bd13090e-7177-01a7-4ac4-840e1b977dcf D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190605/GOOG /compo/20190605/GOOG 1 0 0 1 0 cid : 40,pt2=>40:6729; #
local8848 b4935730-6372-b2a1-4f24-6c323037e576 e:data/CHUNKS/compo/20190605/AAPL /compo/20190605/AAPL 1 0 0 1 0 cid : 40,pt2=>40:6613; #
local8848 f8ee72c9-dad3-f49e-430e-5ddb3c61ae18 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190604/MSFT /compo/20190604/MSFT 1 0 0 1 0 cid : 40,pt2=>40:6664; #
local8848 08e26b5a-dfac-799f-4979-0dd3902eae6e D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190604/GOOG /compo/20190604/GOOG 1 0 0 1 0 cid : 40,pt2=>40:6635; #
local8848 f9e53a3d-af3e-018d-4bfa-a2b4980f3561 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190604/AAPL /compo/20190604/AAPL 1 0 0 1 0 cid : 40,pt2=>40:6783; #
local8848 417e49e9-5c61-cf9e-4b21-4b35f8e57273 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190601/MSFT /compo/20190601/MSFT 1 0 0 1 0 cid : 40,pt2=>40:6602; #
local8848 3ee64942-1d72-bea7-4bc1-f720132d9288 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190602/AAPL /compo/20190602/AAPL 1 0 0 1 0 cid : 40,pt2=>40:6749; #

Ex. 2 In the following example, the function sum and arguments 1..10 are wrapped into a partial application sum{1..10}.

pnodeRun(sum{1..10}, `nodeA`nodeB);
Node Value
DFS_NODE2 55
DFS_NODE3 55

Ex. 3 pnodeRun is a very convenient tool for cluster management. For example, in a cluster of 4 nodes: "DFS_NODE1", "DFS_NODE2", "DFS_NODE3", and "DFS_NODE4", run the following script on each of the node:

def jobDemo(n){
    s = 0
    for (x in 1 : n) {
        s += sum(sin rand(1.0, 100000000)-0.5)
        print("iteration " + x + " " + s)
    }
    return s
};

submitJob("jobDemo1","job demo", jobDemo, 10);
submitJob("jobDemo2","job demo", jobDemo, 10);
submitJob("jobDemo3","job demo", jobDemo, 10);

To check the status of the most recent 2 completed batch jobs on each of the 4 nodes in the cluster:

pnodeRun(getRecentJobs{2});
Node UserID JobID JobDesc ReceivedTime StartTime EndTime ErrorMsg
DFS_NODE1 root jobDemo2 job demo 2017.11.16T13:04:38.841 2017.11.16T13:04:38.841 2017.11.16T13:04:51.660
DFS_NODE1 root jobDemo3 job demo 2017.11.16T13:04:38.841 2017.11.16T13:04:38.843 2017.11.16T13:04:51.447
DFS_NODE2 root jobDemo2 job demo 2017.11.16T13:04:56.431 2017.11.16T13:04:56.432 2017.11.16T13:05:11.992
DFS_NODE2 root jobDemo3 job demo 2017.11.16T13:04:56.432 2017.11.16T13:04:56.434 2017.11.16T13:05:11.670
DFS_NODE3 root jobDemo2 job demo 2017.11.16T13:05:08.418 2017.11.16T13:05:08.419 2017.11.16T13:05:29.176
DFS_NODE3 root jobDemo3 job demo 2017.11.16T13:05:08.419 2017.11.16T13:05:08.421 2017.11.16T13:05:29.435
DFS_NODE4 root jobDemo2 job demo 2017.11.16T13:05:16.324 2017.11.16T13:05:16.325 2017.11.16T13:05:34.729
DFS_NODE4 root jobDemo3 job demo 2017.11.16T13:05:16.325 2017.11.16T13:05:16.328 2017.11.16T13:05:34.716
pnodeRun(getRecentJobs{2}, `DFS_NODE3`DFS_NODE4);
Node UserID JobID JobDesc ReceivedTime StartTime EndTime ErrorMsg
DFS_NODE3 root jobDemo2 job demo 2017.11.16T13:05:08.418 2017.11.16T13:05:08.419 2017.11.16T13:05:29.176
DFS_NODE3 root jobDemo3 job demo 2017.11.16T13:05:08.419 2017.11.16T13:05:08.421 2017.11.16T13:05:29.435
DFS_NODE4 root jobDemo2 job demo 2017.11.16T13:05:16.324 2017.11.16T13:05:16.325 2017.11.16T13:05:34.729
DFS_NODE4 root jobDemo3 job demo 2017.11.16T13:05:16.325 2017.11.16T13:05:16.328 2017.11.16T13:05:34.716
How does pnodeRun merge the results from multiple nodes:
  1. If function returns a scalar:

    Return a table with 2 columns: node alias and function results.

    Continuing with the example above:
    pnodeRun(getJobReturn{`jobDemo1});
    Node Value
    DFS_NODE3 2,123.5508
    DFS_NODE2 (42,883.5404)
    DFS_NODE1 3,337.4107
    DFS_NODE4 (2,267.3681)
  2. If function returns a vector:

    Return a matrix. Each column of the matrix would be the function returns from nodes. The column label of the matrix would be the nodes.

  3. If function returns a key-value dictionary:

    Return a table with each row representing the function return from one node.

  4. If function returns a table:

    Return a table which is the union of individual tables from multiple nodes.

  5. If function is a command (a command returns nothing):

    Return nothing

  6. For all other cases:

    Return a dictionary. The key is node alias and the value is the function return.