{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "da1c1a7b",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "import itertools\n",
    "from functools import reduce\n",
    "import os\n",
    "import multiprocessing\n",
    "import time\n",
    "import warnings\n",
    "from tqdm import tqdm\n",
    "import datetime\n",
    "\n",
    "warnings.filterwarnings(\"ignore\")\n",
    "\n",
    "pd.options.display.width = 1200\n",
    "pd.options.display.max_colwidth = 100\n",
    "pd.options.display.max_columns = 10\n",
    "pd.options.mode.chained_assignment = None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "dc3ee62a",
   "metadata": {},
   "outputs": [],
   "source": [
    "def volumeWeightedAvgPrice(df, lag):\n",
    "    res = df[[\"TradeTime\", \"SecurityID\"]]\n",
    "    totalAmount = (df[\"OrderQty\"]*df[\"Price\"]).rolling(lag).sum()\n",
    "    totalVolume = df[\"OrderQty\"].rolling(lag).sum()\n",
    "    res[\"volumeWeightedAvgPrice\"] = totalAmount / totalVolume\n",
    "    return res"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "90fa9a47",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = pd.read_csv(\"/ssd/ssd3/data/oneStock_oneFile_TL/20230201/entrust/000001.csv\")\n",
    "t0 = time.time()\n",
    "res = volumeWeightedAvgPrice(df, 60)\n",
    "print(\"cal time: \", time.time() - t0, \"s\")\n",
    "print(res)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "5585c733",
   "metadata": {},
   "outputs": [],
   "source": [
    "def pool_func(tick_obj, trade_path_obj):\n",
    "    single_tick_res = []\n",
    "    tmp_date = trade_path_obj.split('/')[-2]\n",
    "    # print(tmp_date)\n",
    "    tmp_date = tmp_date[0:4] + \"-\" + tmp_date[4:6] + \"-\" + tmp_date[6:8]\n",
    "    # print(tmp_date)\n",
    "    for tick in tqdm(tick_obj):\n",
    "        try:\n",
    "            df = pd.read_csv(os.path.join(trade_path_obj, tick))\n",
    "            Indicator = volumeWeightedAvgPrice(df, 60)\n",
    "            single_tick_res.append(Indicator)\n",
    "            # print(Indicator)\n",
    "            # print(\"net_large_buy_ratio_after_open:\", Indicator)\n",
    "        except Exception as error:\n",
    "            continue\n",
    "\n",
    "    return pd.concat(single_tick_res)\n",
    "\n",
    "\n",
    "class multi_task_split:\n",
    "\n",
    "    def __init__(self, data, processes_to_use):\n",
    "        self.data = data\n",
    "        self.processes_to_use = processes_to_use\n",
    "\n",
    "    def num_of_jobs(self):\n",
    "        return min(len(self.data), self.processes_to_use, multiprocessing.cpu_count())\n",
    "\n",
    "    def split_args(self):\n",
    "        q, r = divmod(len(self.data), self.num_of_jobs())\n",
    "        return (self.data[i * q + min(i, r): (i + 1) * q + min(i + 1, r)] for i in range(self.num_of_jobs()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "46a9f806",
   "metadata": {},
   "outputs": [],
   "source": [
    "n_use = 24\n",
    "# Modify the path to the directory where the data is stored\n",
    "trade_path = r\"/ssd/ssd3/data/oneStock_oneFile_TL/20230201/entrust\"\n",
    "stock_pool = os.listdir(trade_path)\n",
    "processes_decided = multi_task_split(stock_pool, n_use).num_of_jobs()\n",
    "print(\"Number of processes:\", processes_decided)\n",
    "split_args_to_process = list(multi_task_split(stock_pool, n_use).split_args())\n",
    "args = [(split_args_to_process[i], trade_path) for i in range(len(split_args_to_process))]\n",
    "print(\"#\" * 50 + \"Multiprocessing Start\" + \"#\" * 50)\n",
    "t0 = time.time()\n",
    "with multiprocessing.Pool(processes=processes_decided) as pool:\n",
    "    res = tqdm(pool.starmap(pool_func, args))\n",
    "    print(\"cal time: \", time.time() - t0, \"s\")\n",
    "    res_combined = pd.concat(res, axis=0)\n",
    "    pool.close()\n",
    "    print(\"cal time: \", time.time() - t0, \"s\")\n",
    "print(res_combined)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "227d6d41",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
