mqtt5
The mqtt5 plugin is built on the Paho MQTT C library and enables connections to and interactions with MQTT servers. It supports connection management, message publishing and subscription, encrypted communication, and other features, so developers can focus on application logic without worrying about low-level communication details. Unlike the mqtt plugin, the mqtt5 plugin supports the MQTT 5.0 protocol and encrypted communication.
Installation
Version Requirements
DolphinDB Server: 2.00.17, 2.00.18, 3.00.5, and later, deployed on a Linux x86-64 system.
Installation Steps
-
In the DolphinDB client, use the listRemotePlugins function to view plugins available for installation.
login("admin", "123456") listRemotePlugins() -
Use the installPlugin function to install the plugin.
installPlugin("mqtt5") -
Use the loadPlugin function to load the plugin.
loadPlugin("mqtt5")
Method References
connect
Syntax
mqtt5::connect(uri, [config])
Details
Establishes a connection to the MQTT server and returns a connection handle. The
connection closes automatically when the session ends, or you can close it by
calling the close function.
Parameters
uri: A STRING scalar that specifies the URI of the MQTT server to connect to, in the format protocol://host:port.
-
protocol specifies the connection protocol. Valid values:
-
tcp or mqtt: Indicates unsecured TCP. If you omit protocol, it is equivalent to specifying tcp or mqtt.
-
ssl, tls, or mqtts: Indicates encrypted SSL/TLS.
-
ws: Indicates an unsecured WebSocket connection.
-
wss: Indicates a secure WebSocket connection.
-
-
host is the hostname of the MQTT server. port is the port number, which can be omitted. If omitted, the default port for the selected protocol is used automatically.
-
If you use an unencrypted protocol, SSL-related settings in config do not take effect.
config: Optional. A dictionary that specifies configuration items. Keys are STRING scalars, and values are of type ANY.
-
"qos": An INT scalar that specifies the message delivery guarantee level between the publisher and subscriber.
-
0 (default): Delivers the message at most once. Fire and forget. The message may be lost and is not retransmitted.
-
1: Delivers the message at least once. Ensures that the message arrives, but duplicate delivery may occur if network acknowledgment packets are lost.
-
2: Delivers the message exactly once. Ensures that the message is neither lost nor duplicated.
-
-
"username": A STRING scalar that specifies the username used to log in to the MQTT server. Defaults to empty.
-
"password": A STRING scalar that specifies the password used to log in to the MQTT server. It takes effect only when username is set. Defaults to empty.
-
"clientID": A STRING scalar that specifies the connection ID. If you do not specify it, the system generates one automatically. If a connection with the same clientID already exists, specifying that clientID may disconnect the previous connection or cause this connection attempt to fail, depending on the MQTT broker.
-
"mqttVersion": An INT scalar that specifies the MQTT protocol version.
-
0 (default): First tries version 3.1.1, then falls back to version 3.1 if that fails.
-
3: Tries version 3.1 only.
-
4: Tries version 3.1.1 only.
-
5: Tries version 5.0 only.
-
-
"useSSL": A BOOL scalar that specifies whether to use SSL encryption. If protocol is set to an encrypted protocol such as ssl, you must set useSSL to true. Defaults to false.
-
"enableServerCertAuth": A BOOL scalar that specifies whether to enable server certificate validation. Defaults to false.
-
"trustStore": Optional. A STRING scalar that specifies the path to the PEM-formatted file containing the public key certificate trusted by the client. Defaults to empty.
-
"maxBufferedMessages": An INT scalar that specifies the maximum number of published messages that can be buffered per connection. The default is 100.
Connection Error Codes
| Error Code | Cause |
|---|---|
| 5 | Unauthorized; authentication was not performed. |
| 4 | Incorrect username or password. |
| 3 | Server unavailable. |
| 2 | The client identifier (clientID) was rejected. |
| 1 | Unacceptable protocol version. |
| 0 | No error; indicates that the MQTT client operation completed successfully. |
| -1 | A general error indicating that the MQTT client operation failed. |
| -2 | MQTTAsync_PERSISTENCE_ERRORerror.
|
| -3 | The client has disconnected. |
| -4 | The maximum number of messages allowed to be in flight at the same time has been reached. |
| -5 | An invalid UTF-8 string was detected. |
| -6 | A NULL-valued parameter was provided where it is not valid. |
| -7 | The topic name was truncated by a NULL character, so the full topic cannot be accessed. |
| -8 | The eyecatcher and version number in the structure parameter are incorrect. |
| -9 | The value of the qos key is not 0, 1, or 2. |
| -10 | All 65,535 MQTT message IDs are currently in use. |
| -11 | The request was discarded before it completed. |
| -12 | No more messages can be buffered. |
| -13 | An SSL connection was attempted with a non-SSL version of the library. |
| -14 | The protocol prefix in serverURI is invalid. TLS-enabled prefixes (ssl, mqtts, wss) are valid only when the TLS version of the library is linked. |
| -15 | Do not use options intended for other MQTT versions. |
| -16 | This call is not valid for the MQTT version currently used by the client. |
| -17 | The will topic length is 0. |
| -18 | The connect or disconnect command was ignored because another connect or disconnect command is already queued at the head of the queue and waiting to be processed. Use the onSuccess/onFailure callbacks to wait for the previous connect or disconnect command to complete. |
| -19 | maxBufferedMessages in the connection options must be >= 0. |
close
Syntax
mqtt5::close(conn)
Details
Disconnects from the MQTT server.
Parameters
conn: The connection handle returned by the connect
method.
publish
Syntax
mqtt5::publish(conn,topic,obj)
Details
Publishes a message to the MQTT server.
Parameters
conn: The connection handle returned by the connect
method.
topic: A STRING scalar that specifies the topic to publish.
msg: A STRING scalar or vector that specifies the message content to publish.
subscribe
Syntax
mqtt5::subscribe(uri, topic, handler, [config])
Details
Subscribes to messages from the MQTT server and returns a subscription handle.
Parameters
uri: A STRING scalar that specifies the URI of the MQTT server to connect to, in the format protocol://host:port.
-
protocol specifies the connection protocol. Valid values:
-
tcp or mqtt: Indicates unsecured TCP. If protocol is omitted, it is equivalent to specifying tcp or mqtt.
-
ssl, tls, or mqtts: Indicates encrypted SSL/TLS.
-
ws: Indicates an unsecured WebSocket connection.
-
wss: Indicates a secure WebSocket connection.
-
-
host is the hostname of the MQTT server. port is the port number, which can be omitted. If omitted, the default port for the selected protocol is used automatically.
- If you use an unencrypted protocol, the SSL-related settings in config do not take effect.
topic: A STRING scalar that specifies the topic to subscribe to.
handler: A user-defined function that takes two parameters and processes received messages. The parameters are topic and msg, where topic is a STRING scalar that specifies the topic on which the subscribed message was received. msg is a STRING scalar that specifies the received message.
config: Optional. A dictionary that specifies configuration items. Keys are STRING scalars, and values are of type ANY.
-
"qos": An INT scalar that specifies the guaranteed delivery level between the publisher and subscriber.
-
0 (default): Delivers at most once. Fire and forget. The message may be lost and is not retransmitted.
-
1: Delivers at least once. Ensures that the message arrives, but duplicate delivery may occur if network acknowledgment packets are lost.
-
2: Delivers exactly once. Ensures that the message is neither lost nor duplicated.
-
-
"username": A STRING scalar that specifies the username used to log in to the MQTT server. Defaults to empty.
-
"password": A STRING scalar that specifies the password used to log in to the MQTT server. It takes effect only when username is set. Defaults to empty.
-
"clientID": A STRING scalar that specifies the connection ID. If you do not specify it, the system generates one automatically. If a connection with the same clientID already exists, specifying that clientID may disconnect the previous connection or cause this connection attempt to fail, depending on the MQTT broker.
-
"mqttVersion": An INT scalar that specifies the MQTT protocol version.
-
0 (default): First tries version 3.1.1, then falls back to version 3.1 if that fails.
-
3: Tries version 3.1 only.
-
4: Tries version 3.1.1 only.
-
5: Tries version 5.0 only.
-
-
"useSSL": A BOOL scalar that specifies whether to use SSL encryption. If protocol is set to an encrypted protocol such as ssl, you must set useSSL to true. Defaults to false.
-
"enableServerCertAuth": A BOOL scalar that specifies whether to enable server certificate validation. Defaults to false.
-
"trustStore": Optional. A STRING scalar that specifies the path to the PEM-formatted file containing the public key certificate trusted by the client. Defaults to empty.
unsubscribe
Syntax
mqtt5::unsubscribe(conn)
Details
Unsubscribes from messages on the MQTT server.
Parameters
conn: The subscription handle returned by subscribe or
the subscription identifier returned by getSubscriberStat.
getSubscriberStat
Syntax
mqtt5::getSubscriberStat()
Details
Queries information about all subscriptions and returns a table with six columns.
| Column Name | Description |
|---|---|
| subscriptionId | The identifier of the subscription connection. |
| user | The session user who created the subscription. |
| uri | The URI of the MQTT server. |
| topic | The subscribed topic. |
| createTimestamp | The time when the subscription was created. |
| receivedPackets | The number of message packets received by the subscription. |
Examples
loadPlugin("mqtt5")
go
// Configuration items
config = dict(STRING, ANY)
config[`useSSL] = true
config[`mqttVersion] = 5
// Create a publishing connection
pub_conn=mqtt5::connect("tls://broker.emqx.io:8883", config)
// Create a subscription connection
table = table(1000:0, [`topic, `message], [STRING, STRING])
def onMessage(tb, topic, payload) {
tableInsert(tb, topic, payload)
}
sub_conn = mqtt5::subscribe("tls://broker.emqx.io:8883", "test_topic/#", onMessage{table}, config)
// Publish
mqtt5::publish(pub_conn, "test_topic/1", "MQTT test publish1")
mqtt5::publish(pub_conn, "test_topic/2", "MQTT test publish2")
mqtt5::publish(pub_conn, "test_topic/2", "MQTT test publish3")
mqtt5::publish(pub_conn, "test_topic/3", "MQTT test publish4")
mqtt5::publish(pub_conn, "test_topic/3", "MQTT test publish5")
// View subscription results and status
sleep(1000)
print table
mqtt5::getSubscriberStat()
// Close the connection
mqtt5::close(pub_conn)
mqtt5::unsubscribe(sub_conn)
