{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "dad5a12b",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "import itertools\n",
    "from functools import reduce\n",
    "import os\n",
    "import multiprocessing\n",
    "import time\n",
    "import warnings\n",
    "from tqdm import tqdm\n",
    "import datetime\n",
    "\n",
    "warnings.filterwarnings(\"ignore\")\n",
    "\n",
    "pd.options.display.width = 1200\n",
    "pd.options.display.max_colwidth = 100\n",
    "pd.options.display.max_columns = 10\n",
    "pd.options.mode.chained_assignment = None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "4ddf81ad",
   "metadata": {},
   "outputs": [],
   "source": [
    "def priceSensitivityOrderFlowImbalance(df):  \n",
    "    deltaP = 10000*df[\"LastPrice\"].diff().fillna(0)\n",
    "    bidQty1 = df[\"BidOrderQty1\"].fillna(0)\n",
    "    askQty1 = df[\"OfferOrderQty1\"].fillna(0)\n",
    "    NVOL = bidQty1 - askQty1\n",
    "\n",
    "    # Linear regression formula linearRegression(y, x): (len(x)*sum(x*y) - sum(x)*sum(y)) / (len(x)*sum(x*x) - sum(x)*sum(x))\n",
    "    res = (len(NVOL)*sum(NVOL*deltaP) - sum(NVOL)*sum(deltaP)) / (len(NVOL)*sum(NVOL*NVOL)- sum(NVOL)*sum(NVOL))\n",
    "    return res"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b07fdc45",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = pd.read_csv(\"/ssd/ssd3/data/oneStock_oneFile_TL/20230201/snapshot/000001.csv\")\n",
    "t0 = time.time()\n",
    "res = priceSensitivityOrderFlowImbalance(df)\n",
    "print(\"cal time: \", time.time() - t0, \"s\")\n",
    "print(res)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "335a32ab",
   "metadata": {},
   "outputs": [],
   "source": [
    "def pool_func(tick_obj, trade_path_obj):\n",
    "    single_tick_res = pd.DataFrame(columns=[\"DATE\",\"priceSensitivityOrderFlowImbalance\"])\n",
    "    tmp_date = trade_path_obj.split('/')[-2]\n",
    "    # print(tmp_date)\n",
    "    tmp_date = tmp_date[0:4] + \"-\" + tmp_date[4:6] + \"-\" + tmp_date[6:8]\n",
    "    # print(tmp_date)\n",
    "    for tick in tqdm(tick_obj):\n",
    "        single_tick_res.at[tick[:6], \"DATE\"] = tmp_date\n",
    "        try:\n",
    "            df = pd.read_csv(os.path.join(trade_path_obj, tick))\n",
    "\n",
    "            Indicator = priceSensitivityOrderFlowImbalance(df)\n",
    "            # print(Indicator)\n",
    "            # print(\"net_large_buy_ratio_after_open:\", Indicator)\n",
    "            single_tick_res.at[tick[:6], \"priceSensitivityOrderFlowImbalance\"] = Indicator\n",
    "\n",
    "        except Exception as error:\n",
    "            single_tick_res.at[tick[:6], \"priceSensitivityOrderFlowImbalance\"] = np.nan\n",
    "            continue\n",
    "\n",
    "    return single_tick_res\n",
    "\n",
    "\n",
    "class multi_task_split:\n",
    "\n",
    "    def __init__(self, data, processes_to_use):\n",
    "        self.data = data\n",
    "        self.processes_to_use = processes_to_use\n",
    "\n",
    "    def num_of_jobs(self):\n",
    "        return min(len(self.data), self.processes_to_use, multiprocessing.cpu_count())\n",
    "\n",
    "    def split_args(self):\n",
    "        q, r = divmod(len(self.data), self.num_of_jobs())\n",
    "        return (self.data[i * q + min(i, r): (i + 1) * q + min(i + 1, r)] for i in range(self.num_of_jobs()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1aa125c0",
   "metadata": {},
   "outputs": [],
   "source": [
    "n_use = 24\n",
    "# Modify the path to the directory where the data is stored\n",
    "trade_path = r\"/ssd/ssd3/data/oneStock_oneFile_TL/20230201/snapshot\"\n",
    "stock_pool = os.listdir(trade_path)\n",
    "processes_decided = multi_task_split(stock_pool, n_use).num_of_jobs()\n",
    "print(\"Number of processes:\", processes_decided)\n",
    "split_args_to_process = list(multi_task_split(stock_pool, n_use).split_args())\n",
    "args = [(split_args_to_process[i], trade_path) for i in range(len(split_args_to_process))]\n",
    "print(\"#\" * 50 + \"Multiprocessing Start\" + \"#\" * 50)\n",
    "t0 = time.time()\n",
    "with multiprocessing.Pool(processes=processes_decided) as pool:\n",
    "    res = tqdm(pool.starmap(pool_func, args))\n",
    "    print(\"cal time: \", time.time() - t0, \"s\")\n",
    "    res_combined = pd.concat(res, axis=0)\n",
    "    pool.close()\n",
    "    print(\"cal time: \", time.time() - t0, \"s\")\n",
    "print(res_combined)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "012910d0",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
