In [1]:
import pandas as pd
import numpy as np
import itertools
from functools import reduce
import os
import multiprocessing
import time
import warnings
from tqdm import tqdm
import datetime

warnings.filterwarnings("ignore")

pd.options.display.width = 1200
pd.options.display.max_colwidth = 100
pd.options.display.max_columns = 10
pd.options.mode.chained_assignment = None

In [2]:
def level10Diff(df, lag=20):
    temp = df[["TradeTime", "SecurityID"]]

    for i in range(10):
        index = str(i+1)
        temp["bid"+index] = df["BidPrice"+index].fillna(0)
        temp["bidAmt"+index] = df["BidOrderQty"+index].fillna(0) * df["BidPrice"+index].fillna(0)
        temp["prevbid"+index] = temp["bid"+index].shift(1).fillna(0)
        temp["prevbidAmt"+index] = temp["bidAmt"+index].shift(1).fillna(0)

    temp["bidMin"] = temp[["bid"+str(i+1) for i in range(10)]].min(axis=1)
    temp["bidMax"] = temp[["bid"+str(i+1) for i in range(10)]].max(axis=1)
    temp["prevbidMin"] = temp[["prevbid"+str(i+1) for i in range(10)]].min(axis=1)
    temp["prevbidMax"] = temp[["prevbid"+str(i+1) for i in range(10)]].max(axis=1)
    temp["pmin"] = temp[["bidMin", "prevbidMin"]].max(axis=1)
    temp["pmax"] = temp[["bidMax", "prevbidMax"]].max(axis=1)

    temp["amtDiff"] = 0.0
    for i in range(10):
        index = str(i+1)
        temp["amtDiff"] += temp["bidAmt"+index]*((temp["bid"+index] >= temp["pmin"])&(temp["bid"+index] <= temp["pmax"])).astype(int) - \
                        temp["prevbidAmt"+index]*((temp["prevbid"+index] >= temp["pmin"])&(temp["prevbid"+index] <= temp["pmax"])).astype(int)
    temp["amtDiff"] = temp["amtDiff"].rolling(lag, 1).sum()
    return temp[["TradeTime", "SecurityID", "amtDiff"]].fillna(0)

In [None]:
df = pd.read_csv("/ssd/ssd3/data/oneStock_oneFile_TL/20230201/snapshot/000001.csv")
t0 = time.time()
res = level10Diff(df, lag=20)
print("cal time: ", time.time() - t0, "s")
print(res)

In [4]:
def pool_func(tick_obj, snapshot_path_obj):
    single_tick_res = []

    for tick in tqdm(tick_obj):
        try:
            df = pd.read_csv(os.path.join(snapshot_path_obj, tick))
            single_tick_res.append(level10Diff(df, lag=20))
        except Exception as error:
            single_tick_res = pd.DataFrame(columns=["UpdateTime", "SecurityID", "amtDiff"])
            continue

    return pd.concat(single_tick_res, axis=0)


class multi_task_split:

    def __init__(self, data, processes_to_use):
        self.data = data
        self.processes_to_use = processes_to_use

    def num_of_jobs(self):
        return min(len(self.data), self.processes_to_use, multiprocessing.cpu_count())

    def split_args(self):
        q, r = divmod(len(self.data), self.num_of_jobs())
        return (self.data[i * q + min(i, r): (i + 1) * q + min(i + 1, r)] for i in range(self.num_of_jobs()))

In [None]:
n_use = 24
# Modify the path to the directory where the data is stored
snapshot_path ="/ssd/ssd3/data/oneStock_oneFile_TL/20230201/snapshot/"
stock_pool = os.listdir(snapshot_path)
processes_decided = multi_task_split(stock_pool, n_use).num_of_jobs()
print("Number of processes:", processes_decided)
split_args_to_process = list(multi_task_split(stock_pool, n_use).split_args())
args = [(split_args_to_process[i], snapshot_path) for i in range(len(split_args_to_process))]
print("#" * 50 + "Multiprocessing Start" + "#" * 50)
t0 = time.time()
with multiprocessing.Pool(processes=processes_decided) as pool:
    res = tqdm(pool.starmap(pool_func, args))
    print("cal time: ", time.time() - t0, "s")
    res_combined = pd.concat(res, axis=0)
    pool.close()
    print("cal time: ", time.time() - t0, "s")
print(res_combined)