udpServer
To help you send and receive data via the UDP protocol, DolphinDB provides the udpServer plugin. This plugin, built on libevent, can serve as a UDP server to listen on specified IP addresses and ports, receiving data sent from UDP clients into the DolphinDB server. It can also function as a UDP client to send UDP data to specified IP addresses and ports.
Installation
Version Requirements
The DolphinDB server must be v2.00.16 and installed on Linux x86-64.
Installation Steps
-
Call the listRemotePlugins function in the DolphinDB client to display available plugins.
Note: The returned results only display plugins supported by the current operating system and server version. If the desired plugin is not listed, refer to the appendix to compile the plugin or provide feedback in the DolphinDB user community.
login("admin", "123456") listRemotePlugins() -
Call the installPlugin function to install the plugin.
installPlugin("udpServer") -
Call the loadPlugin function to load the plugin.
login("admin", "123456") loadPlugin("udpServer")
Method References
bind
Syntax
udpServer::bind(host, port, bufferSize)
Details
Binds the IP address and port to a socket and returns a handle ID. An exception will be thrown upon a failure. If the size of the UDP packet to be received exceeds the value specified by bufferSize, only the first bufferSize bytes of the packet will be read.
Parameters
host: A STRING scalar indicating the IP address to listen on.
port: An INTEGRAL scalar indicating the port number to listen on.
bufferSize: An INTEGRAL scalar indicating the maximum size (in bytes) of a UDP packet to be received. The size of UDP packets sent by the client must not exceed the value specified by bufferSize. The upper limit of bufferSize is 65535, and any value smaller than 64 will be set to 64.
Return
Returns a handle ID.
Note
-
When bufferSize increases, the sending speed decreases in order to maintain the highest possible rate with zero packet loss. Therefore, it is recommended to set bufferSize to 1024 bytes.
-
When bufferSize = 1024, the relationship between the package transmission rate and the packet loss rate is as follows:
Packet Transmission Rate (k/s) Loss Rate (%) <100 0 >=100 & <200 <0.1 >=200 & <500 ≈50
As seen from the table, the higher the transmission rate, the higher the loss rate. For optimal performance, it is recommended to limit the transmission rate to ≤ 500k packets/s per socket and increase the size of each packet to the buffersize limit as much as possible.
subscribe
Syntax
udpServer::subscribe(socket, topic, parser, handler)
Details
Subscribes to and processes the data received by udpServer.
Parameters
socket: The handle ID returned by the bind
method.
topic: A STRING scalar indicating the topic of the subscription.
parser: A unary function that only accepts a STRING scalar as the parameter and returns a table.
handler: A table used to store the parsed data.
Note
-
You can create multiple subscriptions using the same socket. If so, udpServer will duplicate data for multiple times and the performance may be impacted. Therefore, avoid multiple subscriptions to a large volume of data from the same socket.
-
If you redefines handler, call the
unsubscribemethod to cancel the previous subscription and call thesubscribemethod to create a subscription again. -
If you need to create multiple subscriptions and import data to the same table, call the share function to share it first. Otherwise, a server crash may occur.
unsubscribe
Syntax
udpServer::unsubscribe(socket, topic)
Details
Cancels the subscription to the data stream of the specified topic.
Parameters
socket: The handle ID returned by the bind
method.
topic: A STRING scalar indicating the topic of the subscription.
unbind
Syntax
udpServer::unbind(socket)
Details
Unbinds the socket from the IP address and port.
Parameters
socket: The handle ID returned by the bind
method.
createPusher
Syntax
udpServer::createPusher(host, port, formatter, dummyTable)
Details
Creates a UDP pusher to send the data injected into the pusher. You can inject data in one of the following methods:
-
Call the append! function to inject data into the pusher.
-
Inject data from the streaming engine’s outputTable into the pusher.
Parameters
host: A STRING scalar indicating the remote IP address of the target server.
port: A INTEGRAL scalar indicating the remote port of the target server.
formatter: A function used to define the packaging format of the data to be pushed. It takes a single input parameter in the form of a Table, and returns a STRING or BLOB value, which can be either a scalar or a vector.
dummyTable: A table object used to receive the injected data.
Example
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
pythonHost = "192.xxx.x.xx" // Change it to the actual host.
pythonPort = 1xxxx // Change it to the actual port.
pusher = udpServer::createPusher(pythonHost, pythonPort, toStdJson, output1)
engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=pusher, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);
insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
getSubscriptionInfo
Syntax
udpServer::getSubscriptionInfo()
Details
Gets the subscription topics in all sockets and their creation time.
getSocketInfo
Syntax
udpServer::getSocketInfo()
Details
Gets all sockets bound to the current udpServer.
Usage Example
The client sends data via the UDP protocol to the endpoint 127.x.x.x::30xxx. The IP address and port number shown here are for illustration purposes only. Please specify the actual address and port when using in practice. DolphinDB server uses the udpServer plugin to receive and process the data.
// Load the plugin
loadPlugin("udpServer")
// Bind a socket and set bufferSize to 1024
// Specify the actual address and port when using in practice
socket = udpServer::bind("127.x.x.x", 30xxx, 1024)
// Create the first subscription
def f1(arg){
return table([arg] as col1)
}
handler_1 = streamTable(10:0, [`wave], [BLOB])
enableTableShareAndPersistence(table=handler_1, tableName=`test01, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=120)
udpServer::subscribe(socket, "subscribetest1", f1, handler_1)
// Create the second subscription
def f2(arg){
return table([arg] as col1)
}
handler_2 = streamTable(10:0, [`wave], [BLOB])
enableTableShareAndPersistence(table=handler_2, tableName=`test02, asynWrite=true, compress=true,cacheSize=10000000, retentionMinutes=120)
udpServer::subscribe(socket, "subscribetest2", f2, handler_2)
// Check the data
select * from test01
select * from test02
// Check the status
print(udpServer::getSubscriptionInfo())
print(udpServer::getSocketInfo())
// Cancel subscriptions
udpServer::unsubscribe(socket, "subscribetest1")
udpServer::unsubscribe(socket, "subscribetest2")
// Unbind the socket
udpServer::unbind(socket)
