Distributed Computing

This section describes parallel function calling, remote function calling, parallel remote calling, and pipeline function.

Parallel Function Call

DolphinDB divides a large task into multiple subtasks for simultaneous execution.

Parallel function calls usually utilize one of the two higher order functions: peach or ploop. peach and ploop are the parallel computing versions of each and loop, respectively. For the difference between each and loop, please refer to loop.

There are 3 scenarios of parallel function calls.

(1) same function with different parameters

peach(log, (1..3, 4..6));
#0 #1
0 1.386294
0.693147 1.609438
1.098612 1.791759
ploop(log, (1..3, 4..6));
([0,0.693147,1.098612],[1.386294,1.609438,1.791759])

(2) different functions with same parameters

peach(call{, 3 4 5}, (log, sum));
log sum
1.098612 12
1.386294 12
1.609438 12
ploop(call{, 3 4 5}, (log, sum));
([1.098612,1.386294,1.609438],12)

Note that in the examples above, we cannot write peach((log, sum), 3 4 5). This is because the first parameter of a template has to be a function name, not function name array. To call multiple functions in peach or ploop, we need to use the template call.

(3) different functions with different parameters

x=[log, exp];
y=[1 2 3, 4 5 6];
peach(call, x, y);
#0 #1
0 54.59815
0.693147 148.413159
1.098612 403.428793
ploop(call, x, y);
([0,0.693147,1.098612],[54.59815,148.413159,403.428793])

How Parallel Computing Works in DolphinDB

DolphinDB supports parallel computing through multi-threading. Suppose there are n tasks and m local executors. (For local executors, please refer to the concept of distributed computing). The calling thread (worker) generates n subtasks and pushes n*m/(1 + m) subtasks to the local executor task queue. The remaining n/(1 + m) subtasks will be executed by the calling thread. After all n subtasks are executed, the calling thread combines the individual results to produce the final result.

To use parallel function calls, we need to make sure the number of local executors set in the configuration file is a positive integer.

If parallel function calls are initiated within subtasks, the system throws an exception and allocates new subtasks to local executors, as it may cause deadlock. However, all of the local executors have been assigned subtasks when parallel function calls are initiated for the first time (when n>1 + m). Since a local executor can only process one task at a time, it may fail to execute subtasks due to its self-contradictory work flow priorities.

Some built-in functions enable parallel function calls if the number of local executors set in the configuration file is a positive integer, such as peach, ploop, pnodeRun, ploadText and loadText.

Remote Function Call

In DolphinDB, we can send a local function that calls other local functions (all of which can be either built-in functions or user-defined functions) to run on a remote node on the fly without compilation or deployment. The system automatically serializes the function definition and the definitions of all dependent functions together with necessary local data to remote nodes.

We must open a connection before a remote call. To open a connection, we can run conn=xdb(host, port) where host is the host name (IP address or website) of the remote node and port is the port number of the remote node.

There are 3 ways to close a connection:

(1) call the close command.

(2) conn/=NULL.

(3) The connection will be closed automatically when the current session closes.

You can use remoteRun , remoteRunWithCompression and rpc for remote calls. They are different in the following aspects:

  • rpc uses existing asynchronous connections among data nodes in the cluster; remoteRun uses explicitly created connections by xdb function.

  • The calling node and the remote node of function rpc must be located in the same cluster; there is no such limitation for remoteRun.

There are 3 ways to make a remote call:

(1) Execute script on a remote node.

Syntax: remoteRun(conn, script) or conn(script) where script must be double quoted (a string).

conn =  xdb("localhost",81);
remoteRun(conn, "x=rand(1.0,10000000); y=x pow 2; avg y");
// output
0.333254

(2) Execute a remote function on a remote node. The function is defined on the remote node, while the parameters are located on the local node.

Syntax: remoteRun(conn, "functionName", param1, param2, ...) or conn("function name", param1, param2, ...)

Note:

functionName must be quoted. The function could be either built-in or user-defined.

conn =  xdb("localhost",81);
remoteRun(conn, "avg", rand(1.0,10000000) pow 2);
// output
0.333446

(3) Execute a local function on a remote node. The function is defined in the local node. It could be a built-in function or user-defined function, a named function or anonymous function. The parameters of the function are also located on the local node.

This is the most powerful feature of remote call in DolphinDB. We can send a local function that calls other local functions (all of them can be either built-in functions or user-defined functions) to run on a remote node on the fly without compilation or deployment. The system will automatically serialize the function definition and the definitions of all dependent functions together with necessary local data to remote nodes. Some other systems can only remotely call functions without any user-defined dependent functions.

Syntax of remoteRun: remoteRun(conn, functionName, param1, param2, ...) or conn(functionName, param1, param2, ...)

Note:

functionName must not be quoted. param1, param2, ... are function arguments. The function could be either a built-in function or a user-defined function on the calling node.

Syntax of rpc: rpc(nodeAlias, function, param1, param2, ...)

Note:

function must not be quoted. param1, param2, ... are function arguments. The function could be either a built-in function or a user-defined function on the calling node. Both the calling node and the remote node must be located in the same cluster. Otherwise, we need to use remoteRun function.

  • Example 1: Remotely call a user-defined function with a local dataset.

    Assume there is a table EarningsDates with 2 columns on the local node: stock ticker and date. For each of the 3 stocks in the table, we have the date when it announced earnings for the 3rd quarter of 2006. There is a table USPrices on the remote node with machine name "localhost" and port number 8081. It contains daily stock prices for all US stocks. We would like to get the stock prices from the remote node for all stocks in EarningsDates for the week after they announced earnings.

    On the remote node, we import the data file to create the table USPrices, and then share it across all nodes as sharedUSPrices.

    USPrices = loadText("c:/DolphinDB/Data/USPrices.csv");
    share USPrices as sharedUSPrices;

    When we create a connection to a remote node, the remote node creates a new session for this connection. This new session is completely isolated from other sessions on the remote node. This is convenient for development as developers don't have to worry about name conflicts. In this case, however, we would like to share data among multiple sessions on the same node. We can use the statement share to share objects. Currently only tables can be shared in DolphinDB.

    We create a table EarningsDates on the local node, and send the table along with the script to a remote node. After the execution, the result is sent back to the local node.

    EarningsDates=table(`XOM`AAPL`IBM as TICKER, 2006.10.26 2006.10.19 2006.10.17 as date)
    
    def loadDailyPrice(data){
        dateDict = dict(data.TICKER, data.date)
        return select date, TICKER, PRC from objByName("sharedUSPrices") where dateDict[TICKER]<date<=dateDict[TICKER]+7
    }
    conn = xdb("localhost",8081)
    prices = conn(loadDailyPrice, EarningsDates);
    
    prices;
    date TICKER PRC
    2006.10.27 XOM 71.46
    2006.10.30 XOM 70.84
    2006.10.31 XOM 71.42
    2006.11.01 XOM 71.06
    2006.11.02 XOM 71.19
    2006.10.18 IBM 89.82
    2006.10.19 IBM 89.86
    2006.10.20 IBM 90.48
    2006.10.23 IBM 91.56
    2006.10.24 IBM 91.49
    2006.10.20 AAPL 79.95
    2006.10.23 AAPL 81.46
    2006.10.24 AAPL 81.05
    2006.10.25 AAPL 81.68
    2006.10.26 AAPL 82.19
  • Example 2: Remotely call a built-in function that quotes a user-defined function.

    def jobDemo(n){
    s = 0
    for (x in 1 : n) {
    s += sum(sin rand(1.0, 100000000)-0.5)
    print("iteration " + x + " " + s)
    }
    return s
    };

    Remote call with function remoteRun:

    conn = xdb("DFS_NODE2")
    conn.remoteRun(submitJob, "jobDemo", "job demo", jobDemo, 10);
    // output
    Output: jobDemo4
    
    conn.remoteRun(getJobReturn, "jobDemo")
    // output
    Output: 4238.832005

    Remote call with function rpc:

    def testRemoteCall() {
        h=xdb("localhost", 8080)
        return h.remoteRun("1+2")
    }
    h = xdb("localhost", 8081)
    h.remoteRun(testRemoteCall)
    Note:

    Please avoid cyclical calls with remoteRun as it may cause deadlocks. For example, if we run the above script on localhost:8080, The node 8080 sends the locally defined function testRemoteCall to the node 8081, which will send the script "1+2" back to execute on the node 8080. When a node receives a job, it will assign a worker thread to execute the job. The remote call from 8080 to 8081 and "1+2" are both executed on the node 8080 and may be assigned the same worker. If these 2 jobs share the same worker, a deadlock occurs.

Parallel Remote Call

If a remote call is in blocking mode, the result will not be returned until the remote node completes the function call. Parallel remote call with remoteRun needs to be used with ploop or peach. In the following example, template each executes the user-defined function experimentPi on node 8081, and then on node 8082; while template peach executes the function experimentPi on node 8081 and node 8082 simultaneously. We can see peach saves a significant amount of time compared with each.

def simuPi(n){
x=rand(1.0, n)
y=rand(1.0, n)
return 4.0 * sum(x*x + y*y<=1.0) / n
}
def experimentPi(repeat, n): avg each(simuPi, take(n, repeat));

// create 2 connections
conns = each(xdb, "localhost", 8081 8082);
conns;
("Conn[localhost:8081:1166953221]","Conn[localhost:8082:1166953221]")

timer result = each(remoteRun{, experimentPi, 10, 10000000}, conns);
// output
Time elapsed: 6579.82 ms

timer result = peach(remoteRun{, experimentPi, 10, 10000000}, conns);
// output
Time elapsed: 4122.29 ms
// parallel computing saves running time

print avg(result)
// output
3.141691

// close two connections
each(close, conns);

To use remoteRun in parallel remote call, we need to establish connections to each remote node with function xdb. To remotely call the nodes within the same cluster as the calling node, we can use pnodeRun.

Syntax: pnodeRun(function, [nodes], [addNodeToResult])

function: the local function to call. It must not be quoted. It must have no parameters. It can be a function with no parameters by definition, or a partial application that wraps the orginal function and its parameters to a function with no parameters. It can be a built-in function or a user-defined function.

nodes: aliases of nodes. It is an optional parameter. If it is not specified, the system will call the function on all live data nodes in the cluster.

addNodeToResult: whether to add aliases of nodes to results. It is an optional parameter. The default value is true. If the returned result from each node already contains the node alias, we can set it to false.

pnodeRun calls a local function on multiple remote nodes in parallel and then merges the results. Both the calling node and the remote node must be located in the same cluster.

In the following example, we wrap the function sum and arguments 1..10 to a partial application sum{1..10}.

pnodeRun(sum{1..10}, `nodeA`nodeB);
Output:
Node          Value
DFS_NODE2        55
DFS_NODE3        55

pnodeRun is very convenient for cluster management. For example, in a cluster with 4 nodes: "DFS_NODE1", "DFS_NODE2", "DFS_NODE3", and "DFS_NODE4", run the following script on each node:

def jobDemo(n){
s = 0
for (x in 1 : n) {
    s += sum(sin rand(1.0, 100000000)-0.5)
    print("iteration " + x + " " + s)
}
return s
};

submitJob("jobDemo1","job demo", jobDemo, 10);
submitJob("jobDemo2","job demo", jobDemo, 10);
submitJob("jobDemo3","job demo", jobDemo, 10);

Check the status of the 2 most recently completed batch jobs on each node in the cluster:

pnodeRun(getRecentJobs{2});
Node UserID JobID JobDesc ReceivedTime StartTime EndTime ErrorMsg
DFS_NODE4 root jobDemo2 job demo 2017.11.21T15:40:22.026 2017.11.21T15:40:22.027 2017.11.21T15:40:43.103
DFS_NODE4 root jobDemo3 job demo 2017.11.21T15:40:22.027 2017.11.21T15:40:22.037 2017.11.21T15:40:43.115
DFS_NODE1 root jobDemo2 job demo 2017.11.21T15:39:48.087 2017.11.21T15:39:48.088 2017.11.21T15:40:03.714
DFS_NODE1 root jobDemo3 job demo 2017.11.21T15:39:48.088 2017.11.21T15:39:48.089 2017.11.21T15:40:03.767
DFS_NODE2 root jobDemo2 job demo 2017.11.21T15:39:58.788 2017.11.21T15:39:58.788 2017.11.21T15:40:14.114
DFS_NODE2 root jobDemo3 job demo 2017.11.21T15:39:58.788 2017.11.21T15:39:58.791 2017.11.21T15:40:14.178
DFS_NODE3 root jobDemo2 job demo 2017.11.21T15:40:16.945 2017.11.21T15:40:16.945 2017.11.21T15:40:38.466
DFS_NODE3 root jobDemo3 job demo 2017.11.21T15:40:16.945 2017.11.21T15:40:16.947 2017.11.21T15:40:38.789
pnodeRun(getRecentJobs{2}, `DFS_NODE3`DFS_NODE4);
Node UserID JobID JobDesc ReceivedTime StartTime EndTime ErrorMsg
DFS_NODE3 root jobDemo2 job demo 2017.11.21T15:40:16.945 2017.11.21T15:40:16.945 2017.11.21T15:40:38.466
DFS_NODE3 root jobDemo3 job demo 2017.11.21T15:40:16.945 2017.11.21T15:40:16.947 2017.11.21T15:40:38.789
DFS_NODE4 root jobDemo2 job demo 2017.11.21T15:40:22.026 2017.11.21T15:40:22.027 2017.11.21T15:40:43.103
DFS_NODE4 root jobDemo3 job demo 2017.11.21T15:40:22.027 2017.11.21T15:40:22.037 2017.11.21T15:40:43.115

pnodeRun follows the following rules to merge the results from multiple nodes:

(1) If the function returns a scalar:

Return a table with 2 columns: node alias and function results.

Continuing with the example above:

pnodeRun(getJobReturn{`jobDemo1})
// output
Output:
Node          Value
DFS_NODE3        2,123.5508
DFS_NODE2        (42,883.5404)
DFS_NODE1        3,337.4107
DFS_NODE4        (2,267.3681)

(2) If the function returns a vector:

Return a matrix. Each column of the matrix is the function result from the node. The column labels of the matrix are the nodes.

(3) If the function returns a key-value dictionary:

Return a table with each row representing the function result from one node.

(4) If the function returns a table:

Return a table which combines individual tables from multiple nodes.

Please see the aforementioned example of pnodeRun(getRecentJobs{2}).

(5) If the function is a command (that returns nothing):

Return nothing

(6) For all other cases:

Return a dictionary. The key of the dictionary is node alias and the value is function result.

Data Source

Data source is a special type of data object that contains the following information about a data entity:

  1. Meta descriptions of a data entity. By executing a data source, we can obtain a materialized data entity such as table, matrix, vector, etc. In DolphinDB's distributed computing framework, lightweight data source objects are passed to remote sites for computing, instead of large data entities, dramatically reducing the network traffic.

  2. Execution locations. A data source could have 0, 1 or multiple locations. A data source with 0 location is a local data source. In case of multiple locations, these locations back up each other. The system randomly picks a location for distributed computing. When the data source is instructed to cache the materialized data object, the system picks the location where data were successfully retrieved last time.

  3. Attributes to instruct the system to cache data or clear cache. For iterative computing algorithms (e.g. machine learning algorithms), data caching could significantly boost computing performance. Cached data will be cleared when the system runs out of memory. If this happens, the system can recover the data since the data source contains all meta descriptions and data transforming functions.

  4. A data source object may also contain multiple data transforming functions to further process the retrieved data. These functions are executed sequentially, with the output of one function as the input (and the only input) of the next function. It is generally more efficient for data transforming functions to be included in the data source instead of the core computing operation. While there is no performance difference if the retrieved data is needed only once, it makes a huge difference for iterative computing on data sources with cached data objects. If the transforming are operated in the core computing unit, each iteration needs to execute the transformation; if the transforming are operated in the data source, they are executed only once.

Map-Reduce

The Map-Reduce function is the core function of DolphinDB's generic distributed computing framework.

Syntax: mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true])

  • ds: the list of data sources. This required parameter must be a tuple and each element of the tuple is a data source object. Even if there is only one data source, we still need a tuple to wrap the data source.

  • mapFunc: the map function. It accepts one and only one argument, which is the materialized data entity from the corresponding data source. If we would like the map function to accept more parameters in addition to the materialized data source, we can use the PartialApplication to convert a multiple-parameter function to a unary function. The number of map function calls is the number of data sources. The map function returns a regular object (scalar, pair, array, matrix, table, set, or dictionary) or a tuple (containing multiple regular objects).

  • reduceFunc: the binary reduce function that combines two map function call results. The reduce function in most cases is trivial. An example is the addition function. The reduce function is optional. If it is not specified, the system simply returns all individual map call results to the final function.

  • finalFunc: the final function accepts one and only one parameter. The output of the last reduce function call is the input of the final function. The final function is optional. If it is not specified, the system returns the individual map function call results.

  • parallel: an optional boolean flag indicating whether to execute the map function in parallel locally. The default value is true, i.e. enabling parallel computing. When there is very limited available memory and each map call needs a large amount of memory, we can disable parallel computing to prevent the out-of-memory problem. We may also disable the parallel option to ensure thread safety. For example, if multiple threads write to the same file simultaneously, errors may occur.

The following is an example of distributed linear regression. X is the matrix of independent variables and y is the dependent variable. X and y are stored in multiple data sources. To estimate the least square parameters, we need to calculate X T X and X T y. We can calculate the tuple of (X T X, X T y) from each data source, then add up the results from all data sources to get X T X and X T y for the entire dataset.

def myOLSMap(table, yColName, xColNames, intercept){
    if(intercept)
        x = matrix(take(1.0, table.rows()), table[xColNames])
    else
        x = matrix(table[xColNames])
    xt = x.transpose()
    return xt.dot(x), xt.dot(table[yColName])
}

def myOLSFinal(result){
    xtx = result[0]
    xty = result[1]
    return xtx.inv().dot(xty)[0]
}

def myOLSEx(ds, yColName, xColNames, intercept){
    return mr(ds, myOLSMap{, yColName, xColNames, intercept}, +, myOLSFinal)
}

In the example above, we define the map function and final function. In practice, we may define transformation functions for data sources as well. These functions only need to be defined in the local instance, with no need for users to compile them or deploy them to the remote instances before using. DolphinDB's distributed computing framework handles these complicated issues for end users on the fly. It is extremely easy to develop distributed analytical functions and applications in DolphinDB.

As a frequently used analytics tool, the distributed least square linear regression is already implemented in DolphinDB core library. The built-in version( olsEx) provides more features.

Iterative Computing

Iterative computing is a commonly used computing methodology. Many machine learning methods and statistical models use iterative algorithms to estimate model parameters.

DolphinDB offers function imr for iterative computing based on the map-reduce methodology. Each iteration uses the result from the previous iteration and the input dataset. The input dataset for each iteration stays unchanged so that it can be cached. Iterative computing requires initial values for the model parameters and a termination criterion.

Syntax: imr(ds, initValue, mapFunc, [reduceFunc], [finalFunc], terminateFunc, [carryover=false])

  • ds: the list of data sources. It must be a tuple with each element as a data source object. Even if there is only one data source, we still need a tuple to wrap the data source. In iterative computing, data sources are automatically cached and the cache will be cleared after the last iteration.

  • initValue: the initial values of model parameter estimates. The format of the initial values must be the same as the output of the final function.

  • mapFunc: the map function. It has two arguments. The first argument is the data entity represented by the corresponding data source. The second is the output of the final function in the previous iteration, which is an updated estimate of the model parameter. For the first iteration, it is the initial values given by the user.

  • reduceFunc: the binary reduce function combines two map function call results. If there are M map calls, the reduce function will be called M-1 times. The reduce function in most cases is trivial. An example is the addition function. The reduce function is optional.

  • finalFunc: the final function in each iteration. It accepts two arguments. The first argument is the output of the final function in the previous iteration. For the first iteration, it is the initial values given by the user. The second argument is the output of the reduce function call. If the reduce function is not specified, a tuple representing the collection of individual map call results would be the second argument.

  • terminateFunc: either a function that determines whether the computation continues, or a specified number of iterations. The termination function accepts two parameters. The first is the output of the reduce function in the previous iteration and the second is the output of the reduce function in the current iteration. If the function returns a true value, the iterations will end.

  • carryover: a Boolean value indicating whether a map function call produces a carryover object to be passed to the next iteration of the map function call. The default value is false. If it is set to true, the map function has 3 arguments and the last argument is the carryover object, and the map function output is a tuple whose last element is the carryover object. In the first iteration, the carryover object is the NULL object.

Now let's use the example of distributed median calculation to illustrate the function imr. Assume the data is scattered on multiple nodes and we would like to calculate the median of a variable across all the nodes. First, for each data source, put the data into buckets and use the map function to count the number of data points in each bucket. Then use the reduce function to merge the bucket counts from multiple data sources. Locate the bucket that contains the median. In the next iteration, the chosen bucket is divided into smaller buckets. The iterations will finish when the size of the chosen bucket is no more than the specified number.

def medMap(data, range, colName){
return bucketCount(data[colName], double(range), 1024, true)
}

def medFinal(range, result){
    x= result.cumsum()
    index = x.asof(x[1025]/2.0)
    ranges = range[1] - range[0]
    if(index == -1)
        return (range[0] - ranges*32):range[1]
    else if(index == 1024)
        return range[0]:(range[1] + ranges*32)
    else{
        interval = ranges / 1024.0
        startValue = range[0] + (index - 1) * interval
        return startValue : (startValue + interval)
    }
}

def medEx(ds, colName, range, precision){
    termFunc = def(prev, cur): cur[1] - cur[0] <= precision
    return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg()
}

The pipeline function

The pipeline function optimizes tasks that meet the following conditions through multithreading:

(1) Can be decomposed into multiple sub-tasks.

(2) Each subtask contains multiple steps.

(3) The k-th step of the i-th subtask can only be executed after the (k-1)-th step of the i-th subtask and the k-th step of the (i-1)-th subtask are completed.

In the following example, we need to convert the partitioned table stockData into a CSV file. This table contains data from 2008 to 2018 and exceeds the available memory of the system, so we cannot load the entire table into memory and then converted it into a CSV file. The task can be divided into multiple subtasks, each of which consists of two steps: load one month of data into memory, and then store the data in the CSV file. To store the data of a month in the CSV file, it must be ensured that the data of the month has been loaded into the memory, and the that data of the previous month has been stored in the CSV file.

v = 2000.01M..2018.12M
def queryData(m){
    return select * from loadTable("dfs://stockDB", "stockData") where TradingTime between datetime(date(m)) : datetime(date(m+1))
}
def saveData(tb){
    tb.saveText("/hdd/hdd0/data/stockData.csv",',', true)
}
pipeline(each(partial{queryData}, v),saveData);