pipeline(initTasks, followers, [queueDepth=2])


initTasks is the collection of the initial steps of all tasks, which is represented by a zero-argument function. For example, if we have 10 tasks then initialTasks is a tuple of 10 zero-argument functions.

followers is a set of unary functions, each of which represents a step of the task after the initial step. If a task consists of N steps, followers should have N-1 unary functions. The output of a follower is the input of the next follower. The last follower may or may not return an object. The initial step of tasks is executed in the main thread (the thread that accepted the tasks) and the remaining steps will be executed in separate threads. To execute tasks of N steps, the system creates N-1 threads and these threads will be destroyed upon completion of the job.

queueDepth is the maximum depth of the queue for the next step. The intermediate result of each step is stored in the queue for the next follower. When the queue is full, the execution will stop when the next step comsumes data from the queue. The deeper the queue, the less waiting time for the next step. However, a deeper queue consumes more memory. queueDepth is optional and the default value is 2.


Optimize tasks that meet the following conditions through multithreading:
  1. Can be decomposed into multiple sub-tasks.

  2. Each subtask contains multiple steps.

  3. The kth step of the ith subtask can only be executed after the (k-1)th step of the ith subtask and the kth step of the (i-1)th subtask are completed.

If the last step (follower) returns an object, the pipeline function returns a tuple. Otherwise, it returns nothing.


In the following example, we need to convert the partitioned table stockData into a CSV file. This table contains data from 2008 to 2018 and exceeds the available memory of the system, so we cannot load the entire table into memory and then converted it into a CSV file. The task can be divided into multiple sub-tasks, each of which consists of two steps: load one month of data into memory, and then store the data in the CSV file. To store the data of a month in the CSV file, it must be ensured that the data of the month has been loaded into the memory, and the that data of the previous month has been stored in the CSV file.

v = 2000.01M..2018.12M
def loadData(m){
return select * from loadTable("dfs://stockDB", "stockData") where TradingTime between datetime(date(m)) : datetime(date(m+1))

def saveData(tb){
tb.saveText("/hdd/hdd0/data/stockData.csv",',', true)

pipeline(each(partial{loadData}, v),saveData);