MQTT

The DolphinDB MQTT plugin, built upon the MQTT-C library, enables connections and interactions with MQTT servers. It not only supports subscribing to messages and parsing received messages into CSV or JSON formats but also supports publishing messages by packaging them into CSV or JSON formats.

By leveraging the high performance and stability of the MQTT-C library, the MQTT plugin provides various features, including efficient connection management, message publishing and subscribing, automatic reconnection, and secure communication. These features empower developers to focus on higher-level application logic, without worrying about the underlying communication details.

Installation (with installPlugin)

  • Required server version: DolphinDB 2.00.10 or higher
  • Supported OS: Linux x86-64 and Windows x64

Installation Steps

(1) Use listRemotePlugins to check plugin information in the plugin repository.

Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.

login("admin", "123456")
listRemotePlugins(, "http://plugins.dolphindb.com/plugins/")

(2) Invoke installPlugin for plugin installation

installPlugin("mqtt")

(3) Load the plugin with loadPlugin

loadPlugin("mqtt")

Method References

connect

Syntax

connect(host, port,[QoS=0],[formatter],[batchSize=0],[username],[password],[sendbufSize=40960],[config])

Details

This function connects to an MQTT server/broker. It returns a connection object that can be explicitly called to close with the close function, or it can be automatically released when the reference count is 0.

Parameters

  • host: A string indicating the IP address of MQTT server/broker.

  • port: An integer indicating the port number of MQTT server/broker.

  • Qos (optional): An integer indicating the quality of service with the default value of 0.

    • 0: at most once;
    • 1: at least once;
    • 2: only once.
  • formatter: A function used to package published data in CSV or JSON formats. Currently supported functions are createJsonFormatter and createCsvFormatter.

  • batchSize: An integer indicating the number of rows sent each time. When the content to be published is a table, it can be sent in batches.

  • username: A string indicating the user name of the MQTT server/broker.

  • password: A string indicating the password of the MQTT server/broker.

  • sendbufSize: An integer indicating the size of the send buffer in bytes, with a default value of 40960.

    config: A dictionary whose keys are strings and values are of ANY type. It is used to specify the following additional options:

    • "recvBufSize": An integer indicating the size of the receive buffer in bytes, with a default value of 40960.
    • "clientID": A string specifying the publisher ID. If not specified, it will be automatically generated.

Examples

f=createJsonFormatter()
conn=connect("test.mosquitto.org",1883,0,f,50)

publish

Syntax

publish(conn,topic,obj)

Details

This function publishes messages to the MQTT server/broker.

Parameters

  • conn: An object generated by function connect.
  • topic: A string indicating the topic.
  • obj: A string or vector/in-memory table indicating the content of the message to be published.

Examples

publish(conn,"dolphindb/topic1","welcome")
publish(conn,"devStr/sensor001",["hello world","welcome"])
t=table(take(0..99,50) as hardwareId ,take(now(),
		50) as ts,rand(20..41,50) as temperature,
		rand(50,50) as humidity,rand(500..1000,
		50) as voltage)
publish(conn,"dolphindb/device",t)		

createPublisher

createPublisher(conn,topic,colNames,colTypes)

Details

This function creates an object that can publish messages by writing data to it.

Parameters

  • conn: An object generated by function connect.
  • topic: A string indicating the topic.
  • colNames: A STRING vector indicating the column names of the published schema.
  • colTypes: A vector indicating the column types of the published schema.

Examples

MyFormat = take("", 3)
MyFormat[2] = "0.000"
f = createCsvFormatter(MyFormat, ',', ';')
pubConn = connect("127.0.0.1",1883,0,f,100)

colNames = [`ts, `hardwareId, `val]
colTypes = [TIMESTAMP, SYMBOL, INT]
publisher = createPublisher(pubConn, "sensor/s001", colNames, colTypes)

// example 1: by appending data to 'publisher' to publish it, the 'tb' here must be a table
append!(publisher, tb)

// example 2: by using the 'insert into' SQL clause
insert into publisher values([2023.08.25 10:57:47.961, 2023.08.25 10:57:47.961], symbol([`bb,`cc]), [22,33])

// example 3: by setting handler=append!{publisher} when subscribing a streaming table
share streamTable(1000:0, `time`sym`val, [TIMESTAMP, SYMBOL, INT]) as trades
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{publisher}, msgAsTable=true);

insert into trades values(2018.10.08T01:01:01.785,`dd,44)
insert into trades values(2018.10.08T01:01:02.125,`ee,55)
insert into trades values(2018.10.08T01:01:10.263,`ff,66)

close

Syntax

close(conn)

Details

This function disconnects from the MQTT server/broker.

Parameters

  • conn: An object generated by function connect.

Examples

close(conn)

subscribe

Syntax

subscribe(host, port, topic, [parser], handler, [username], [password], [recvbufSize=20480],[config])

Details

This function subscribes to the MQTT server/broker and returns a connection object.

Parameters

  • host: A string indicating the IP address of the MQTT server/broker.
  • port: An integer indicating the port number of the MQTT server/broker.
  • topic: A string indicating the subscription topic.
  • parser: A function used to parse subscribed messages into CSV or JSON formats. Currently supported functions are createJsonParser and createCsvParser.
  • handler: A function or a table used to process the subscribed data from the MQTT server/broker.
  • username: A string indicating the user name of the MQTT server/broker.
  • password: A string indicating the password of the MQTT server/broker.
  • recvbufSize: An integer indicating the size of the receive buffer in bytes, with a default value of 40960.
  • config: A dictionary whose keys are strings and values are of ANY type. It is used to specify the following additional options:
    • "sendBufSize": An integer indicating the size of the send buffer in bytes, with a default value of 40960.
    • "subscribeID": A string specifying the subscriber ID. If not specified, it will be automatically generated.
    • "asyncFlag": A Boolean value specifying whether to enable the asynchronous subscription mode.

Examples

p = createCsvParser([INT, TIMESTAMP, DOUBLE, DOUBLE,DOUBLE], ',', ';' )
sensorInfoTable = table( 10000:0,`deviceID`send_time`temperature`humidity`voltage,[INT, TIMESTAMP, DOUBLE, DOUBLE,DOUBLE])
conn = subscribe("192.168.1.201",1883,"sensor/#",p,sensorInfoTable)

getSubscriberStat

getSubscriberStat()    

Details

This function gets the information on all subscriptions and returns a table with the following columns:

  • "subscriptionId": ID of a subscription.
  • "user": the session user who created the subscription.
  • "host": IP address for the MQTT server/broker.
  • "port": Port number of the MQTT server/broker.
  • "topic": the subscription topic.
  • "createTimestamp": the time when the subscription was created.
  • "receivedPackets": the number of messages received.

unsubscribe

Syntax

unsubcribe(subscription)  

Details

This function cancels the subscription to the MQTT server/broker.

Parameters

  • subscription: The value returned by the subscribe function or the subscription ID returned by getSubscriberStat.

Examples

unsubscribe(sub1) 
unsubscribe("350555232l")    

createCsvFormatter

Syntax

createCsvFormatter([format], [delimiter=','], [rowDelimiter=';'])

Details

This function creates a formatter function in CSV format.

Parameters

  • format: A STRING vector.
  • delimiter: A string indicating the separator between columns, and the default is ','.
  • rowDelimiter: A string indicating the separator between the lines, and the default is ';'.

Examples

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
MyFormat = take("", 18)
MyFormat[2] = "0.000"
MyFormat[5] = "yyyy.MM.dd"
f = createCsvFormatter(MyFormat)
f(t)

createCsvParser

Syntax

createCsvParser(schema, [delimiter=','], [rowDelimiter=';'])

Details

This function creates a parser function in CSV format.

Parameters

  • schema: A vector indicating the data types of columns.
  • delimiter: A string indicating the separator between columns, and the default is ','.
  • rowDelimiter: A string indicating the separator between the lines, and the default is ';'.

Examples

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = createCsvFormatter([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
s=f(t)
p = createCsvParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
p(s)

createJsonFormatter

Syntax

createJsonFormatter()

Details

This function creates a formatter function in JSON format.

Parameters

None.

Examples

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = createJsonFormatter()
f(t)

createJsonParser

createJsonParser(schema, colNames)

Details

This function creates a parser function in JSON format.

Parameters

  • schema: A vector indicating the data types of columns.
  • colNames: A string indicating the column names.

Examples

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = createJsonFormatter()
p = createJsonParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL],
`bool`char`short`int`long`date`month`time`minute`second`datetime`timestamp`nanotime`nanotimestamp`float`double`string`symbol)
s=f(t)
x=p(s)

Usage Examples

loadPlugin("./plugins/mqtt/bin/PluginMQTTClient.txt"); 
use mqtt; 

//***************************publish a table****************************************//
MyFormat = take("", 5)
MyFormat[2] = "0.000"
f = createCsvFormatter(MyFormat, ',', ';')

//create a record for every device
def writeData(hardwareVector){
	hardwareNumber = size(hardwareVector)
	return table(take(hardwareVector,hardwareNumber) as hardwareId ,take(now(),
		hardwareNumber) as ts,rand(20..41,hardwareNumber) as temperature,
		rand(50,hardwareNumber) as humidity,rand(500..1000,
		hardwareNumber) as voltage)
}
def publishTableData(server,topic,iterations,hardwareVector,interval,f){
    conn=connect(server,1883,0,f,100)
    for(i in 0:iterations){
	   t=writeData(hardwareVector)
	   publish(conn,topic,t)
	   sleep(interval)
    }
    close(conn)
         
}
host="192.168.1.201"
submitJob("submit_pub1", "submit_p1", publishTableData{host,"sensor/s001",10,100..149,100,f})
publishTableData(host,"sensor/s001",100,0..99,100,f)


//*******************************subscribe : handler is a table************************************************//
p = createCsvParser([INT, TIMESTAMP, DOUBLE, DOUBLE,DOUBLE], ',', ';' )
sensorInfoTable = table( 10000:0,`deviceID`send_time`temperature`humidity`voltage ,[INT, TIMESTAMP, DOUBLE, DOUBLE,DOUBLE])
conn = subscribe("192.168.1.201",1883,"sensor/#",p,sensorInfoTable)