WebSocket

The DolphinDB WebSocket plugin provides powerful WebSocket client functionality. As a WebSocket client, this plugin can connect to a WebSocket server and enable bidirectional communication. It supports reading data (real-time market data, trading data, etc.) from a WebSocket server and also supports sending data (order instructions, subscription requests, etc.) to the WebSocket server. Furthermore, the plugin can extract response headers during the WebSocket communication process, helping users better understand the communication status and obtain relevant information. Currently, the plugin only supports the wss protocol.

Installation (with installPlugin)

Required server version: DolphinDB 2.00.12 or higher

Supported OS: Linux x86-64.

Installation Steps:

(1) Use listRemotePlugins to check plugin information in the plugin repository.

Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.

login("admin", "123456")
listRemotePlugins()

(2) Invoke installPlugin for plugin installation.

installPlugin("WebSocket")

(3) Use loadPlugin to load the plugin before using the plugin methods.

loadPlugin("WebSocket")

Method References

createSubJob

Syntax

createSubJob(url, onOpen, onMessage, onError, onClose, tag, [config])

Details

The method creates a WebSocket subscription and returns a WebSocket object. If the subscription tag already exists and is running, an error will be thrown; otherwise, existing subscription information will be overwritten.

Parameters

  • url: A STRING scalar indicating the WebSocket address.
  • onOpen: A function whose first parameter is the WebSocket object.
def onOpen(ws){
}
  • onMessage: A function whose first parameter is the WebSocket object; the second parameter is the received data, which is a BLOB type vector.
def onMessage(ws, data){
}
  • onError: A function whose first parameter is the WebSocket object; the second parameter is the error message, which is a BLOB type scalar.
def onError(ws, error){
}
  • onClose: A function whose first parameter is the WebSocket object; the second parameter is the error code, which is an INT type scalar; the third parameter is the close message, which is a BLOB type scalar.
def onClose(ws, statusCode, msg){
}
  • tag: A STRING scalar indicating the WebSocket subscription identifier.
  • config: A dictionary used to specify additional configuration parameters. The keys are of STRING scalars, and the values are of ANY type. Currently supported keys are:
    • "proxy": Sets the proxy address. The key value is a STRING scalar, e.g., "<http://proxy.example.com:8080/>".
    • "reconnectCount": Sets the number of reconnection attempts after a connection is lost, excluding cases where the user actively unsubscribes. The value is an INT scalar.The default is -1, meaning the system will keep trying to reconnect until successful.
share table(1:0, ["data"], [BLOB]) as table1
def onOpen(ws){
  WebSocket::send(ws, '{"method": "SUBSCRIBE","params": ["btcusdt@aggTrade","btcusdt@depth"], "id": 1}')
}
def onMessage(ws, data){
  	table1.append!(table(data as col1))
}
def onError(ws, error){
  writeLog("failed to receive data: " + error)
}
def onClose(ws, statusCode, msg){
  writeLog("connection is close, statusCode: " + statusCode + ", " + msg)
}
config = dict(STRING, ANY)
config[`proxy] = "http://proxy.example.com:8080/"
url = "ws://127.0.0.1:8765"
ws = WebSocket::createSubJob(url, onOpen, onMessage, onError, onClose, "sub1", config)

send(ws, data)

Syntax

send(ws, data)

Details

WebSocket messages can be sent within the onOpenonMessage parameters of the createSubJob method. Alternatively, you can use the WebSocket object returned by createSubJob to send messages. The function returns a boolean value indicating whether the message was sent successfully.

Parameters

  • ws: The WebSocket object.
  • data: A BLOB type scalar indicating the data to be sent.

getSubJobStat()

Syntax

getSubJobStat()

Details

The function retrieves information on all WebSocket background tasks, including those canceled via cancelSubJob. Note that if different tasks have the same tag value, the information of the previous task will be overwritten by the information of the later task. The function returns a table with the following columns:

  • tag: WebSocket task identifier.
  • startTime: Task creation time.
  • endTime: Task cancellation time. Tasks can be canceled using cancelSubJob. The initial value is null, indicating the task has not been canceled or has reconnected successfully.
  • firstMsgTime: Time when the first message was received.
  • lastMsgTime: Time when the last message was received.
  • processedMsgCount: Number of successfully received messages. The initial value is 0.
  • failedNetMsgCount: Number of messages with network errors. The initial value is 0.
  • failedMsgCount: Number of messages that failed to be processed. The initial value is 0.
  • netErrCount: Number of network errors encountered during the last processing.
  • lastOnFuncErrMsg: Error message from the last callback function.
  • lastFailedTimestamp: Timestamp of the last processing failure.

cancelSubJob

Syntax

cancelSubJob(tag)

Details

The function cancels a WebSocket background task. The function returns a boolean value indicating whether the cancellation was successful.

Parameters

  • tag: A STRING scalar indicating the WebSocket task identifier. You can obtain the tag using getSubJobStat().

Examples

tag = (exec tag from WebSocket::getSubJobStat())[0]
WebSocket::cancelSubJob(tag)

getResponseHeader

Syntax

getResponseHeader(ws, key)

Details

The function retrieves the WebSocket message header, returning a STRING scalar. The message header can be called within the onOpenonMessage parameters of the createSubJob function.

Parameters

  • ws: The WebSocket object.
  • key: A STRING scalar indicating the key of the header to be retrieved.
data = WebSocket::getResponseHeader(ws, "server")