Python API 接入数据
实时流数据接入是指将数据从数据源实时写入 DolphinDB 中,供后续进行清洗、计算等使用。本文档介绍如何使用 Python API 把数据写入 DolphinDB。
按照上游发布端数据产生频率大小推荐不同的写入接口:
-
当数据实时写入频率小于 100 条每秒时,推荐使用
tableInsert
同步写入接口,其写入延时相对其它接口是最低的。 -
当数据实时写入频率大于 100 条每秒时,推荐使用
MultithreadedTableWriter
异步写入接口,其吞吐量和写入延时综合效益最高。
下面将介绍两种场景下推荐函数的使用方法和基准性能。
如果本文档中遇到 DolphinDB Python API 使用相关问题,参考:Python API。
tableInsert 同步写入接口
当数据实时写入频率小于 100 条每秒时,推荐使用 tableInsert
同步接口写入数据。本文档提供 10
列数据的写入示例和基准性能。
step1:创建共享普通流数据表
import dolphindb as ddb
import numpy as np
import pandas as pd
import random
s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")
script = """
name = `c1Time`c2Symbol`c3Int`c4Int`c5Long`c6Long`c7Float`c8Float`c9Double`c10ArrayInt
type = [TIMESTAMP, SYMBOL, INT, INT, LONG, LONG, FLOAT, FLOAT, DOUBLE, INT[]]
share(table=streamTable(20000:0, name, type), sharedName="sharedMinBar")
"""
s.run(script)
注意:生产环境推荐使用共享持久化流数据表。
step2:tableInsert 写入数据
def createData():
c1Time = [pd.Timestamp.now()]
c2Symbol = ['000001']
c3Int = [random.randint(0, 10)]
c4Int = [random.randint(0, 10)]
c5Long = [random.randint(0, 1000)]
c6Long = [random.randint(0, 1000)]
c7Float = [random.uniform(0, 1)]
c8Float = [random.uniform(0, 1)]
c9Double = [random.uniform(0, 1)]
c10ArrayInt = [np.random.randint(10,size=10)]
return [c1Time, c2Symbol, c3Int, c4Int, c5Long, c6Long, c7Float, c8Float, c9Double, c10ArrayInt]
if __name__ == "__main__":
inertRows = 10000
for i in range(inertRows):
insertData = createData()
s.run("tableInsert{sharedMinBar}",insertData)
step3:查询表中数据
pd = s.run("select * from sharedMinBar limit 10")
step4:删除流数据表并关闭当前会话
script = """
dropStreamTable(tableName="sharedMinBar")
"""
s.run(script)
s.close()
基准性能
tableInsert
同步写入接口基准性能测试代码请参考附录 B,测试结果如下:
指标 |
性能(毫秒) |
---|---|
最小单次插入耗时 | 0.31 |
最大单次插入耗时 | 1.24 |
平均单次插入耗时 | 0.69 |
单次插入耗时统计图如下:

基准性能测试结果显示,DolphinDB Python API 的 tableInsert
同步写入接口具有以下特点:
-
实时逐条写入共享流数据表的平均耗时是 0.69 毫秒。
-
稳定性较好,波动较小,平均单条数据写入的最大耗时小于 2 毫秒。
MultithreadedTableWriter 异步写入接口
当数据写入频率大于 100 条每秒时,推荐使用 MultithreadedTableWriter
(MTW) 函数写入数据。
如下图所示,MultithreadedTableWriter
函数把 Python 端实时产生的数据先通过 insert 线程写入
MTW 消息队列,同时 append 线程会批量取出消息队列的数据,自动转换数据类型后把数据写入 DolphinDB。

下面为 MultithreadedTableWriter
使用示例。
-
逐笔成交表
import numpy as np import pandas as pd import dolphindb as ddb import random import datetime s = ddb.session() s.connect(host="localhost", port=8848, userid="admin", password="123456") writer = ddb.MultithreadedTableWriter("localhost", 8848, "admin", "123456","","tglobal",False,False,[],10,0,2,"SecurityID") start = datetime.datetime.now() for i in range(100): res = writer.insert(random.randint(1,100),random.randint(1,100),str(i),random.randint(1,100),random.randint(1,100),str(i),str(i),random.random(),random.randint(1,100),str(i),np.datetime64('2021-01-04T09:30:02.000'),9,random.randint(1,100),random.randint(1,100),random.random(),str(i),random.randint(1,100),str(i),str(i)) writer.waitForThreadCompletion() print(writer.getStatus()) // output ''' errorCode : None errorInfo : isExiting : True sentRows : 10000 unsentRows : 0 sendFailedRows: 0 threadStatus : threadId sentRows unsentRows sendFailedRows 0 0 0 0 2474632960 4994 0 0 2483025664 5006 0 0 <dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0ca9f0e8d0> '''
-
arrayVector 类型快照表
writer = ddb.MultithreadedTableWriter("localhost", 8892, "admin", "123456","","tglobal",False,False,[],10,0,2,"SecurityID") for i in range(10000): res = writer.insert(str(i),np.datetime64('2021-01-04T09:30:02.000'),random.random(),random.random(),random.random(),random.random(),random.random(),random.randint(1,100),random.random(),"OCALL",np.random.rand(10),np.random.randint(10,size=10),np.random.randint(10,size=10),np.random.randint(10,size=10),np.random.rand(10),np.random.randint(10,size=10),np.random.randint(10,size=10),np.random.randint(10,size=10),random.randint(1,100),random.random(),random.randint(1,100),random.randint(1,100),random.random(),random.random(),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.random(),random.randint(1,100),random.randint(1,100),random.random(),random.randint(1,100),random.randint(1,100),random.random(),random.randint(1,100),random.randint(1,100),random.random()) writer.waitForThreadCompletion() print(writer.getStatus()) // output ''' errorCode : None errorInfo : isExiting : True sentRows : 10000 unsentRows : 0 sendFailedRows: 0 threadStatus : threadId sentRows unsentRows sendFailedRows 0 0 0 0 2474632960 4994 0 0 2466240256 5006 0 0 <dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0cb032e190> '''
-
multicolumn 类型快照表
writer = ddb.MultithreadedTableWriter("localhost", 8892, "admin", "123456","","tglobal",False,False,[],10,0,2,"SecurityID") for i in range(10000): res = writer.insert(str(i),np.datetime64('2021-01-04T09:30:02.000'),random.random(),random.random(),random.random(),random.random(),random.random(),random.randint(1,100),random.random(),str(i),random.random(),random.random(),random.random(),random.random(),random.random(),random.random(),random.random(),random.random(),random.random(),random.random(),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.random(),random.random(),random.random(),random.random(),random.random(),random.random(),random.random(),random.random(),random.random(),random.random(),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.random(),random.randint(1,100),random.randint(1,100),random.random(),random.random(),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.randint(1,100),random.random(),random.randint(1,100),random.randint(1,100),random.random(),random.randint(1,100),random.randint(1,100),random.random(),random.randint(1,100),random.randint(1,100),random.random()) writer.waitForThreadCompletion() print(writer.getStatus()) // output ''' errorCode : None errorInfo : isExiting : True sentRows : 10000 unsentRows : 0 sendFailedRows: 0 threadStatus : threadId sentRows unsentRows sendFailedRows 0 0 0 0 2474632960 4994 0 0 2466240256 5006 0 0 <dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0ca9b37fd0> '''
附录 A-测试环境
硬件名称 | 配置信息 |
---|---|
CPU | Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz |
内存 | 32 GB |
磁盘 | SSD 500 GB |
软件名称 | 版本信息 |
---|---|
操作系统 | CentOS Linux 7 (Core) |
内存 | 3.10.0-1160.el7.x86_64 |
DolphinDB server | 2.00.10.9 2023.12.09 |
DolphinDB Python API | 1.30.22.6 |
Python | 3.11.5 |
Numpy | 1.24.4 |
Pandas | 2.1.1 |
附录 B-tableInsert 同步写入接口基准性能测试代码
import dolphindb as ddb
import numpy as np
import pandas as pd
import random
#模拟数据
def createData():
c1Time = [pd.Timestamp.now()]
c2Symbol = ['000001']
c3Int = [random.randint(0, 10)]
c4Int = [random.randint(0, 10)]
c5Long = [random.randint(0, 1000)]
c6Long = [random.randint(0, 1000)]
c7Float = [random.uniform(0, 1)]
c8Float = [random.uniform(0, 1)]
c9Double = [random.uniform(0, 1)]
c10ArrayInt = [np.random.randint(10,size=10)]
return [c1Time, c2Symbol, c3Int, c4Int, c5Long, c6Long, c7Float, c8Float, c9Double, c10ArrayInt]
if __name__ == "__main__":
#与DolphinDB建立会话
s = ddb.session()
s.connect(host="localhost", port=8200, userid="admin", password="123456")
#创建共享普通流数据表
script = """
name = `c1Time`c2Symbol`c3Int`c4Int`c5Long`c6Long`c7Float`c8Float`c9Double`c10ArrayInt
type = [TIMESTAMP, SYMBOL, INT, INT, LONG, LONG, FLOAT, FLOAT, DOUBLE, INT[]]
share(table=streamTable(20000:0, name, type), sharedName="sharedMinBar")
"""
s.run(script)
#测试写入数据行数
inertRows = 10000
#延时统计
minCostTime = 1000.0
maxCostTime = 0.0
totalCostTime = 0.0
count = 0
for i in range(inertRows):
insertData = createData()
startTime = pd.Timestamp.now()
s.run("tableInsert{sharedMinBar}",insertData)
delta = (pd.Timestamp.now() - startTime) / np.timedelta64(1, 'ms')
if(delta < minCostTime):
minCostTime = delta
if(delta > maxCostTime):
maxCostTime = delta
totalCostTime += delta
count += 1
#每隔一段时间打印写入性能测试结果
if(count %10000 == 0):
print(f"min: {minCostTime} ms")
print(f"max: {maxCostTime} ms")
print("count:", count)
print(f"totalCostTime: {totalCostTime} ms")
print(f"avg: {totalCostTime / count} ms")
#删除流数据表
script = """
dropStreamTable(tableName="sharedMinBar")
"""
s.run(script)
#关闭当前会话
s.close()