{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "678ddd1b",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "import itertools\n",
    "from functools import reduce\n",
    "import os\n",
    "import multiprocessing\n",
    "import time\n",
    "import warnings\n",
    "from tqdm import tqdm\n",
    "import datetime\n",
    "\n",
    "warnings.filterwarnings(\"ignore\")\n",
    "\n",
    "pd.options.display.width = 1200\n",
    "pd.options.display.max_colwidth = 100\n",
    "pd.options.display.max_columns = 10\n",
    "pd.options.mode.chained_assignment = None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "ca8da7d2",
   "metadata": {},
   "outputs": [],
   "source": [
    "def level10Diff(df, lag=20):\n",
    "    temp = df[[\"TradeTime\", \"SecurityID\"]]\n",
    "\n",
    "    for i in range(10):\n",
    "        index = str(i+1)\n",
    "        temp[\"bid\"+index] = df[\"BidPrice\"+index].fillna(0)\n",
    "        temp[\"bidAmt\"+index] = df[\"BidOrderQty\"+index].fillna(0) * df[\"BidPrice\"+index].fillna(0)\n",
    "        temp[\"prevbid\"+index] = temp[\"bid\"+index].shift(1).fillna(0)\n",
    "        temp[\"prevbidAmt\"+index] = temp[\"bidAmt\"+index].shift(1).fillna(0)\n",
    "\n",
    "    temp[\"bidMin\"] = temp[[\"bid\"+str(i+1) for i in range(10)]].min(axis=1)\n",
    "    temp[\"bidMax\"] = temp[[\"bid\"+str(i+1) for i in range(10)]].max(axis=1)\n",
    "    temp[\"prevbidMin\"] = temp[[\"prevbid\"+str(i+1) for i in range(10)]].min(axis=1)\n",
    "    temp[\"prevbidMax\"] = temp[[\"prevbid\"+str(i+1) for i in range(10)]].max(axis=1)\n",
    "    temp[\"pmin\"] = temp[[\"bidMin\", \"prevbidMin\"]].max(axis=1)\n",
    "    temp[\"pmax\"] = temp[[\"bidMax\", \"prevbidMax\"]].max(axis=1)\n",
    "\n",
    "    temp[\"amtDiff\"] = 0.0\n",
    "    for i in range(10):\n",
    "        index = str(i+1)\n",
    "        temp[\"amtDiff\"] += temp[\"bidAmt\"+index]*((temp[\"bid\"+index] >= temp[\"pmin\"])&(temp[\"bid\"+index] <= temp[\"pmax\"])).astype(int) - \\\n",
    "                        temp[\"prevbidAmt\"+index]*((temp[\"prevbid\"+index] >= temp[\"pmin\"])&(temp[\"prevbid\"+index] <= temp[\"pmax\"])).astype(int)\n",
    "    temp[\"amtDiff\"] = temp[\"amtDiff\"].rolling(lag, 1).sum()\n",
    "    return temp[[\"TradeTime\", \"SecurityID\", \"amtDiff\"]].fillna(0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a198cb62",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = pd.read_csv(\"/ssd/ssd3/data/oneStock_oneFile_TL/20230201/snapshot/000001.csv\")\n",
    "t0 = time.time()\n",
    "res = level10Diff(df, lag=20)\n",
    "print(\"cal time: \", time.time() - t0, \"s\")\n",
    "print(res)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "11602a9f",
   "metadata": {},
   "outputs": [],
   "source": [
    "def pool_func(tick_obj, snapshot_path_obj):\n",
    "    single_tick_res = []\n",
    "\n",
    "    for tick in tqdm(tick_obj):\n",
    "        try:\n",
    "            df = pd.read_csv(os.path.join(snapshot_path_obj, tick))\n",
    "            single_tick_res.append(level10Diff(df, lag=20))\n",
    "        except Exception as error:\n",
    "            single_tick_res = pd.DataFrame(columns=[\"UpdateTime\", \"SecurityID\", \"amtDiff\"])\n",
    "            continue\n",
    "\n",
    "    return pd.concat(single_tick_res, axis=0)\n",
    "\n",
    "\n",
    "class multi_task_split:\n",
    "\n",
    "    def __init__(self, data, processes_to_use):\n",
    "        self.data = data\n",
    "        self.processes_to_use = processes_to_use\n",
    "\n",
    "    def num_of_jobs(self):\n",
    "        return min(len(self.data), self.processes_to_use, multiprocessing.cpu_count())\n",
    "\n",
    "    def split_args(self):\n",
    "        q, r = divmod(len(self.data), self.num_of_jobs())\n",
    "        return (self.data[i * q + min(i, r): (i + 1) * q + min(i + 1, r)] for i in range(self.num_of_jobs()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "99f2c1b4",
   "metadata": {},
   "outputs": [],
   "source": [
    "n_use = 24\n",
    "# Modify the path to the directory where the data is stored\n",
    "snapshot_path =\"/ssd/ssd3/data/oneStock_oneFile_TL/20230201/snapshot/\"\n",
    "stock_pool = os.listdir(snapshot_path)\n",
    "processes_decided = multi_task_split(stock_pool, n_use).num_of_jobs()\n",
    "print(\"Number of processes:\", processes_decided)\n",
    "split_args_to_process = list(multi_task_split(stock_pool, n_use).split_args())\n",
    "args = [(split_args_to_process[i], snapshot_path) for i in range(len(split_args_to_process))]\n",
    "print(\"#\" * 50 + \"Multiprocessing Start\" + \"#\" * 50)\n",
    "t0 = time.time()\n",
    "with multiprocessing.Pool(processes=processes_decided) as pool:\n",
    "    res = tqdm(pool.starmap(pool_func, args))\n",
    "    print(\"cal time: \", time.time() - t0, \"s\")\n",
    "    res_combined = pd.concat(res, axis=0)\n",
    "    pool.close()\n",
    "    print(\"cal time: \", time.time() - t0, \"s\")\n",
    "print(res_combined)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f7c524bc",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
