Methods for Asynchronous Execution and Others
This section describes the methods of DBConnectionPool. With these functions, there are 3 ways to asynchronously execute scripts and you can choose which method to use based on your needs.
- Coroutine function
run
: Run tasks asynchronously with coroutines. Task IDs increase sequentially. addTask
,isFinished
,getData
: Submit scripts to DBConnectionPool which handles the asynchronous execution and obtains the result. Task IDs are specified by users.runTaskAsync
- Create an event loop in the connection pool. Tasks are executed within the event loop using therun
method and a concurrent.futures.Future object is returned. Task IDs increase sequentially.
Note: Task IDs are generated automatically in the first and third methods, but specified by users in the second method. To prevent ID conflicts, do not use the second method and first/third methods simultaneously.
run
run(script, *args, clearMemory=None, pickleTableToList=None,
priority=None, parallelism=None, disableDecimal=None,)
script: the DolphinDB script to be executed.
*args: the parameters to be passed into the functions in script
clearMemory: bool, default True. Whether to release variables after query.
pickleTableToList: bool, default False. True means to download returned Table objects as list objects. False means to download returned Table objects as DataFrame objects. For details, see PROTOCOL_DDB and PROTOCOL_PICKLE.
priority: int, default 4. Specifies job priority - the bigger the value, the higher the priority. It is introduced in DolphinDB Python API 1.30.22.2.
parallelism: int, default 64. Specifies the parallelism of a job, which is the maximum number of threads to execute the tasks of a job on a data node at the same time. It is introduced in DolphinDB Python API 1.30.22.2. Starting from 3.0.1.1, the default value has been increased from 2 to 64. If the maximum value per user is also set with setMaxJobParallelism(userId, maxParallelism) on the server side, the parallelism of an API job will be the minimum of parallelism and maxParallelism.
disableDecimal: bool, default False. Whether to convert the DECIMAL column to DOUBLE type. If True, the DOUBLE column will be returned.
The run
method in DBConnectionPool
is made into a coroutine function, which passes scripts to a thread pool for execution. run
must be called in a coroutine.
Example 1. Executing tasks with predefined logic
(1) Construct a DBConnectionPool with a max size of 8 connections. Note: Unlike a typical connection pool, when a connection of DBConnectionPool is no longer in use, it is not immediately destroyed. Instead, the connections are retained in the pool until the pool itself is destructed or explicitly closed by the method shutDown()
.
import dolphindb as ddb
import time
import asyncio
pool = ddb.DBConnectionPool("localhost", 8848, 8)
(2) Define an async function test_run
, which sleeps 2 seconds and then returns 1+i.
async def test_run(i):
try:
return await pool.run(f"sleep(2000);1+{i}")
except Exception as e:
print(e)
(3) Create a list of 4 tasks and create an event loop. Run the event loop until all 4 coroutines complete.
tasks = [
asyncio.ensure_future(test_run(1)),
asyncio.ensure_future(test_run(3)),
asyncio.ensure_future(test_run(5)),
asyncio.ensure_future(test_run(7)),
]
loop = asyncio.get_event_loop()
try:
time_st = time.time()
loop.run_until_complete(asyncio.wait(tasks))
time_ed = time.time()
except Exception as e:
print("catch e:")
print(e)
(4) Print the total time taken for all the tasks to complete; Loop through each of the tasks and print their result; Shut down the connection.
print("time: ", time_ed-time_st)
for task in tasks:
print(task.result())
pool.shutDown()
Expected output:
time: 2.0017542839050293
2
4
6
8
In this example, although only 1 main thread is created, we use coroutines to execute asynchronous tasks in the DBConnectionPool, enabling concurrent execution.
Note: The DolphinDB Python API uses C++ threads internally to handle each connection. If the number of submitted tasks exceeds the actual number of threads, some tasks may be delayed.
Example 2. Executing tasks which accept user-defined scripts
The following example defines a class that accepts user scripts as parameters.
import dolphindb as ddb
import time
import asyncio
import threading
# The main thread creates coroutines and schedules them to run, while offloading the actual event loop execution to another thread.
class DolphinDBHelper(object):
pool = ddb.DBConnectionPool("localhost", 8848, 10)
@classmethod
async def test_run(cls,script):
print(f"run script: [{script}]")
return await cls.pool.run(script)
@classmethod
async def runTest(cls,script):
start = time.time()
task = loop.create_task(cls.test_run(script))
result = await asyncio.gather(task)
print(f"""[{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}] time: {time.time()-start} result: {result}""")
return result
# Define a thread function to run the event loop
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__=="__main__":
start = time.time()
print("In main thread",threading.current_thread())
loop = asyncio.get_event_loop()
# A child thread is launched to call start_thread_loop and run the event loop forever
t = threading.Thread(target= start_thread_loop, args=(loop,))
t.start()
task1 = asyncio.run_coroutine_threadsafe(DolphinDBHelper.runTest("sleep(1000);1+1"),loop)
task2 = asyncio.run_coroutine_threadsafe(DolphinDBHelper.runTest("sleep(3000);1+2"),loop)
task3 = asyncio.run_coroutine_threadsafe(DolphinDBHelper.runTest("sleep(5000);1+3"),loop)
task4 = asyncio.run_coroutine_threadsafe(DolphinDBHelper.runTest("sleep(1000);1+4"),loop)
end = time.time()
print("main thread time: ", end - start)
Output:
In main thread <_MainThread(MainThread, started 139838803788160)>
main thread time: 0.00039839744567871094
run script: [sleep(1000);1+1]
run script: [sleep(3000);1+2]
run script: [sleep(5000);1+3]
run script: [sleep(1000);1+4]
[2023-03-14 16:46:56] time: 1.0044968128204346 result: [2]
[2023-03-14 16:46:56] time: 1.0042989253997803 result: [5]
[2023-03-14 16:46:58] time: 3.0064148902893066 result: [3]
[2023-03-14 16:47:00] time: 5.005709409713745 result: [4]
In this example, the main thread creates a child thread that keeps running the event loop, then schedules 4 coroutines on that event loop. Each coroutine has a sleep time of 1s, 3s, 5s and 1s, respectively.
The output shows that the coroutines were executed concurrently:
- main thread time: 0.00039839744567871094
indicates very little time spent in the main thread, as the tasks were executed asynchronously in the event loop;
- Coroutines with sleep times of 1s (task1 and task4) finished at the same time, then the coroutine with 3s (task2) finished 2s later, and the coroutine with 5s (task3) finished 2s after that.
addTask
, isFinished
, getData
addTask
submits user scripts to DBConnectionPool by with user-specified task ID. Use isFinished
to check if a task in the connection pool has completed, and use getData
to retrieve task result.
addTask
addTask(script, taskId, *args, **kwargs)
- script: the DolphinDB script to be executed.
- taskId: int. The ID of the task
- *args: the parameters to be passed into the functions in the script.
- **kwargs
- clearMemory: bool, default True. Whether to release variables after the query.
- pickleTableToList: bool, default False. True means to convert returned Table objects into list objects. False means to convert returned Table objects into DataFrame objects.
addTask
submit a task to DBConnectionPool by task ID. The submitted tasks are executed using connections allocated by the connection pool.
Example
Add a task with the ID of 12 to the connection pool.
pool.addTask("sleep(1000);1+2", taskId=12)
isFinished
isFinished(taskId)
- taskID: int. The ID of the task.
isFinished
checks if a task has completed by task ID. Returns True if the task has completed, otherwise returns False.
Example
if pool.isFinished(taskId=12):
print("task has done!")
getData
getData(taskId)
- taskID: int. The ID of the task.
getData
queries the result of a task by task ID.
Example
res = pool.getData(taskId=12)
Note:
If the result of a task is not retrieved by getData
, and a new task is later submitted with the same taskID, the result of the old task will be overwritten.
A Complete Example
In the following script, we first create a DBConnectionPool object and add a task to the connection pool with ID 12. Then continuously check if task ID 12 is finished using isFinished
. Once the task completes, get the execution result using getData
and print it.
import dolphindb as ddb
import time
pool = ddb.DBConnectionPool("localhost", 8848, 8)
taskid = 12
pool.addTask("sleep(1500);1+2", taskId=taskid)
while True:
if pool.isFinished(taskId=taskid):
break
time.sleep(0.01)
res = pool.getData(taskId=taskid)
print(res)
# output:
3
runTaskAsync
runTaskAsync(script, *args, **kwargs)
- script: the DolphinDB script to be executed.
- *args: the parameters to be passed into the script
- **kwargs
- clearMemory: bool, default True. Whether to release variables after the query.
- pickleTableToList: bool, default False. True means to convert returned Table objects into list objects. False means to convert returned Table objects into DataFrame objects.
Note
- In Python API 1.30.17.4 and earlier versions, this method is known as
runTaskAsyn
. - When this method is used to execute script asynchronously, make sure to call
pool.shutDown()
after the tasks have finished to properly destruct the connection pool.
Besides run
and addTask
, you can also use the runTaskAsync
method of DBConnectionPool to execute scripts asynchronously.
You can add a task to the connection pool by calling runTaskAsync
, which returns a concurrent.futures.Future
object. Then get the task result by calling the object’s result(timeout=None)
method.
The timeout parameter of result
sets the maximum wait time in seconds for task completion. If a timeout value is passed, result
will return the task result if it completes within the timeout duration. Otherwise, result
will raise a TimeoutError. The default value of timeout is None, indicating no timeout.
Example
import dolphindb as ddb
import time
pool = ddb.DBConnectionPool("localhost", 8848, 10)
t1 = time.time()
task1 = pool.runTaskAsync("sleep(1000); 1+0");
task2 = pool.runTaskAsync("sleep(2000); 1+1");
task3 = pool.runTaskAsync("sleep(4000); 1+2");
task4 = pool.runTaskAsync("sleep(1000); 1+3");
t2 = time.time()
print(task1.result())
t3 = time.time()
print(task2.result())
t4 = time.time()
print(task4.result())
t5 = time.time()
print(task3.result())
t6 = time.time()
print(t2-t1)
print(t3-t1)
print(t4-t1)
print(t5-t1)
print(t6-t1)
pool.shutDown()
Output:
Task1 Result: 1
Task2 Result: 2
Task4 Result: 4
Task3 Result: 3
Add Tasks: 0.0015881061553955078
Get Task1: 1.0128183364868164
Get Task2: 2.0117716789245605
Get Task4: 2.0118134021759033
Get Task3: 4.012163162231445
This script creates a DBConnectionPool, then runs 4 tasks (with different time durations) asynchronously on the pool by calling runTaskAsync
, which returns 4 concurrent.futures.Future
objects. Then we print the result of each task in blocking order using the result
method. Then print the time measured at various points:
- t2 - t1: Time to add all tasks to the connection pool.
- t3 - t1: Time taken to get the result of task1. As task1 costs 1 sec, the result is 1sec.
- t4 - t1 = 2 sec: Time taken to get the result of task2. As task2 costs 2 sec, the result is 2 sec.
- t5 - t1 = 2 sec: Time taken to get the result of task4. As task4 costs 1 sec, plus the time waiting for task2 to finish, the result is 2 sec.
- t6 - t1 = 4 sec: Time taken to get the result of task3. As task3 costs 4 sec, the result is 4 sec.
The overall output demonstrates that all tasks are run concurrently.
Other Methods
shutDown
pool.shutDown()
Use this method to shut down a DBConnectionPool that is no longer in use, terminate the event loops, and stop all asynchronous tasks. After this method is called, the connection pool can no longer be used.
Note: If you have created asynchronous tasks using runTaskAsync
, make sure to shutDown
the connections when they are not in use.
getSessionId
sessionids = pool.getSessionId()
getSessionId
returns the ID of all sessions in the current connection pool.