MQTT Client Plugin

The DolphinDB MQTT Client plugin has different branches, such as release200 and release130. Each branch corresponds to a DolphinDB server version. Please make sure you are in the correct branch of the plugin documentation.

Load Precompiled Plugin

There are two options to load the MQTT client plugin in DolphinDB:

  • Using the function loadPlugin
  • Specifying the configuration parameter preloadModules

Function loadPlugin

loadPlugin("/YOUR_PATH/mqtt/PluginMQTTClient.txt"); 

Note: You can modify the path as appropriate. With a Windows OS, make sure to specify an absolute path and replace "\" with "\\" or "/".

Configuration Parameter preloadModules

Alternatively, you can preload the plugin during server startup by specifying the configuration parameter preloadModules, so you don't have to call loadPlugin afterwards. For single-machine deployment, configure the parameter in dolphindb.cfg. For cluster deployment, the plugin must be loaded on both the controller and the associated data node(s), and the simplest way is to configure the parameter in both controller.cfg and cluster.cfg.

preloadModules=plugins::mqtt,plugins::odbc

Separate plugins with a comma.

Note:

  • The configuration parameter preloadModules is only supported on server version 1.20.x and higher.
  • preloadModules is used to preload plugins and modules in DolphinDB. The directory for the plugin files are specified by the configuration parameter pluginDir. For more information, see documentation.

(Optional) Manually Compile Plugin

Linux

  • This plugin has been successfully compiled with GCC (version 5.4.0) on 64-bits Linux operating system.
  • Before compiling, install git and CMake.

For Ubuntu users, just type

$ sudo apt-get install git cmake
  • Create a 'build' directory. Under the directory, run cmake .. and make to generate 'libPluginMQTTClient.so'.
mkdir build
cd build
cmake ..
make

Windows

This plugin has been successfully compiled with MinGW-W64-builds-4.3.3 on 64-bits Windows operating system. Install cmake and MinGW on your machine. Add the "bin" directories of MinGW and cmake to your PATH on Windows.

    git clone https://github.com/dolphindb/DolphinDBPlugin.git
    cd DolphinDBPlugin/mqtt
    mkdir build
    cd build
    cmake ..
    copy /YOURPATH/libDolphinDB.dll . 
    make

Note: To specify a different path for MinGW, modify the following line in CmakeList.txt:

    set(MINGW32_LOCATION C://MinGW/MinGW/)  

The libPluginMQTTClient.dll file is generated after compilation. Next, refer to the procedure described in Section 1 "Load Precompiled Plugin" to load the compiled plugin.

Publish

Connect to a MQTT server/broker

Syntax

mqtt::connect(host, port,[QoS=0],[formatter],[batchSize=0],[username],[password])

The function connect to a MQTT server/broker. It returns a connection object which can be explicitly called to close with the close function, or it can be automatically released when the reference count is 0.

Arguments

  • 'host' is a string indicating the IP address of MQTT server/broker.

  • 'port' is an integer indicating the port number of MQTT server/broker.

  • 'Qos' is an integer indicating the quality of service. 0: at most once; 1: at least once; 2: only once. It is optional and the default value is 0.

  • 'formatter' is a function used to package published data in a format. Currently supported functions are createJsonFormatter and createCsvFormatter.

  • 'batchSize' is an integer. When the content to be published is a table, it can be sent in batches, and batchSize indicates the number of rows sent each time.

  • 'username' and 'password' are user credentials to the MQTT server/broker.

Example

f=mqtt::createJsonFormatter()
conn=connect("test.mosquitto.org",1883,0,f,50)

Publish

Syntax

mqtt::publish(conn,topic,obj)

This function posts one or more messages to the MQTT server/broker

Arguments

  • 'conn' is an object generated by function

  • 'topic' is a string indicating the subscription topic.

  • 'obj' is the content of the message to be published, which can be a table or a string or an array of strings.

Example

mqtt::publish(conn,"dolphindb/topic1","welcome")
mqtt::publish(conn,"devStr/sensor001",["hello world","welcome"])
t=table(take(0..99,50) as hardwareId ,take(now(),
		50) as ts,rand(20..41,50) as temperature,
		rand(50,50) as humidity,rand(500..1000,
		50) as voltage)
mqtt::publish(conn,"dolphindb/device",t)		

Close the connection

Syntax

mqtt::close(conn)

This function disconnects from the server/broker.

Arguments

  • 'conn' is an object generated by function.

Example

mqtt::close(conn)

Subscribe/Unsubscribe

Subscribe

Syntax

mqtt::subscribe(host, port, topic, [parser], handler,[username],[password])

Arguments

  • 'host' is a string indicating the IP address of MQTT server/broker.

  • 'port' is an integer indicating the port number of MQTT server/broker.

  • 'topic' is a string indicating the subscription topic.

  • 'parser' is a function for parsing subscribed messages. Currently supported functions are createJsonParser and createCsvParser

  • 'handler' is a function or a table to process the subscribed data.

  • 'username' and 'password' are strings indicating user credentials to the MQTT server/broker.

Details

Subscribe to a MQTT server/broker. It returns a connection object.

Example

p = createCsvParser([INT, TIMESTAMP, DOUBLE, DOUBLE,DOUBLE], ',', ';' )
sensorInfoTable = table( 10000:0,`deviceID`send_time`temperature`humidity`voltage,[INT, TIMESTAMP, DOUBLE, DOUBLE,DOUBLE])
conn = mqtt::subscribe("192.168.1.201",1883,"sensor/#",p,sensorInfoTable)

Check Subscription

mqtt::getSubscriberStat()    

Get the information on all subscriptions. Return a table with the following columns:

  • "subscriptionId" - ID of a subscription
  • "user" - the session user who created the subscription
  • "host" - IP address for the MQTT server/broker
  • "port" - Port number of the MQTT server/broker
  • "topic" - the subscription topic
  • "createTimestamp" - the time when the subscription was created
  • "receivedPackets" - the number of messages received

Unsubscribe

Syntax

mqtt::unsubcribe(subscription)  

Cancel subscription to the MQTT server/broker.

Arguments

  • 'subscription' is the value returned by the subscribe function or the subscription ID returned by getSubscriberStat.

Example

mqtt::unsubscribe(sub1) 
mqtt::unsubscribe("350555232l")    

Formatter/Parser

createCsvFormatter

Syntax

mqtt::createCsvFormatter([format], [delimiter=','], [rowDelimiter=';'])

This function creates a Formatter function in CSV format.

Arguments

  • 'format'is a string array.
  • 'delimiter'is the separator between columns, the default is ','
  • 'rowDelimiter' is the separator between the lines, the default is ';'

The return value is a function.

Example

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
MyFormat = take("", 18)
MyFormat[2] = "0.000"
MyFormat[5] = "yyyy.MM.dd"
f = mqtt::createCsvFormatter(MyFormat)
f(t)

createCsvParser

Syntax

mqtt::createCsvParser(schema, [delimiter=','], [rowDelimiter=';'])

This function creates a Parser function in CSV format.

Arguments

  • 'schema' is an array of column data types
  • 'delimiter'is the separator between columns, the default is ','
  • 'rowDelimiter' is the separator between the lines, the default is ';'

Example

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = mqtt::createCsvFormatter([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
s=f(t)
p = mqtt::createCsvParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
p(s)

createJsonFormatter

Syntax

mqtt::createJsonFormatter()

This function creates a Formatter function in JSON format

Arguments None

The return value is a function.

Example

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = mqtt::createJsonFormatter()
f(t)

createJsonParser

mqtt::createJsonParser(schema, colNames)

This function creates a Parser function in JSON format.

Arguments

  • 'schema' is a vector of data types for all columns.
  • 'colNames' is a column name vector

Example

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = mqtt::createJsonFormatter()
p = createJsonParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL],
`bool`char`short`int`long`date`month`time`minute`second`datetime`timestamp`nanotime`nanotimestamp`float`double`string`symbol)
s=f(t)
x=p(s)

An example

loadPlugin("./plugins/mqtt/bin/PluginMQTTClient.txt"); 
use mqtt; 

//***************************publish a table****************************************//
MyFormat = take("", 5)
MyFormat[2] = "0.000"
f = createCsvFormatter(MyFormat, ',', ';')

//create a record for every device
def writeData(hardwareVector){
	hardwareNumber = size(hardwareVector)
	return table(take(hardwareVector,hardwareNumber) as hardwareId ,take(now(),
		hardwareNumber) as ts,rand(20..41,hardwareNumber) as temperature,
		rand(50,hardwareNumber) as humidity,rand(500..1000,
		hardwareNumber) as voltage)
}
def publishTableData(server,topic,iterations,hardwareVector,interval,f){
    conn=connect(server,1883,0,f,100)
    for(i in 0:iterations){
	   t=writeData(hardwareVector)
	   publish(conn,topic,t)
	   sleep(interval)
    }
    close(conn)
         
}
host="192.168.1.201"
submitJob("submit_pub1", "submit_p1", publishTableData{host,"sensor/s001",10,100..149,100,f})
publishTableData(host,"sensor/s001",100,0..99,100,f)


//*******************************subscribe : handler is a table************************************************//
p = createCsvParser([INT, TIMESTAMP, DOUBLE, DOUBLE,DOUBLE], ',', ';' )
sensorInfoTable = table( 10000:0,`deviceID`send_time`temperature`humidity`voltage ,[INT, TIMESTAMP, DOUBLE, DOUBLE,DOUBLE])
conn = mqtt::subscribe("192.168.1.201",1883,"sensor/#",p,sensorInfoTable)