ZeroMQ Plugin

ZeroMQ (also known zmq) is an asynchronous messaging library, aimed at use in distributed or concurrent applications.

Through the DolphinDB ZeroMQ plugin, users can create zmq sockets and communicate via zmq messages, which includes session establishment, message publish-subscribe and transmission.

The DolphinDB ZeroMQ plugin has the branches release 200 and release130. Each plugin version corresponds to a DolphinDB server version. If you use a different DolphinDB server version, refer to the corresponding branch of the plugin documentation.

Install the Plugin

Download Precompiled Binaries

Linux

The precompiled binaries are stored in the directory DolphinDBPlugin/zmq/bin/linux64. You can execute the following command to load the plugin in DolphinDB:

cd DolphinDB/server //Change to the directory of DolphinDB server
./dolphindb //Start DolphinDB server
loadPlugin("<PluginDir>/zmq/bin/linux64/PluginZmq.txt") //Load the plugin

Build a Plugin

You can also manually compile a zmq plugin following the instructions:

(1) compile libzmq

Download libzmq-4.3.4

cd libzmq-4.3.4
cp include/zmq.h /path/to/PluginZmq/bin/include/
mkdir build && cd build
cmake ..
make -j8
cp lib/libzmq.a /path/to/PluginZmq/bin/linux64/

(2) obtain the header file of cppzmq

Download cppzmq-4.7.1

cd cppzmq-4.7.1
cp zmq.hpp /path/to/PluginZmq/bin/include/

(3) build a plugin

mkdir build
cd build
cmake  ../
make

Note: Make sure the file libDolphinDB.so is under the GCC search path before compilation. You can add the plugin path to the library search path LD_LIBRARY_PATH or copy it to the build directory.

libPluginZmq.so and PluginZmq.txt are generated under the working directory after successful compilation.

Send

zmq::socket

Syntax

zmq::socket(type, formatter, [batchSize], [prefix])

Parameters

  • type: a STRING scalar indicating the socket type to be created. It can be "ZMQ_PUB" and "ZMQ_PUSH".
  • formatter: a function used to package published data in a specified format. Currently it supports methods createJsonFormatter and createCsvFormatter. Alternatively, you can define a formatter function which takes data of method zmq::send as the argument.
  • batchSize: an integer indicating the number of messages sent each time. For a table to be published, it can be sent in batches.
  • prefix: a STRING scalar indicating the message prefix.

Details

Create a zmq socket.

Note: When using methods connect, bind, send and close for concurrent operations, different zmq sockets must be constructed for different threads.

Example

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)

zmq::connect

Syntax

zmq::connect(socket, addr, [prefix])

Parameters

  • socket: a zmq socket.
  • addr: the address string in the form of "protocol://interface:port", indicating the remote address to be connected to. "protocol" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "interface:port" is the remote IP address and port number.
  • prefix: a STRING scalar indicating the message prefix.

Details

Use socket to establish connections to zmq. Keepalive is enabled after the tcp connection is set so that it can be automatically connected.

Example

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")

zmq::bind

Syntax

zmq::bind(socket, addr, [prefix])

Parameters

  • socket: a zmq socket.
  • addr: the address string in the form of "protocol://interface:port", indicating the remote address to be connected to. "protocol" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "interface:port" is the remote IP address and port number.
  • prefix: a STRING scalar indicating the message prefix.

Details

Bind a socket to a specific address to accept incoming requests.

Example

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::bind(socket, "tcp://*:55631", "prefix1")

zmq::send

Syntax

zmq::send(socket, data, [prefix])

Parameters

  • socket: a zmq socket.
  • data: the data to be sent. Its data type must match the argument passing to formatter of method zmq::socket. Otherwise, the formatting will fail and an exception will be thrown.
  • prefix: a STRING scalar indicating the message prefix.

Details

Send a zmq message. Return true if successful.

Example

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")
zmq::send(socket, table(1..10 as id))

zmq::close

Syntax

zmq::close(socket)

Details

Close a zmq socket.

Example

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")
zmq::close(socket)

Subscribe

zmq::createSubJob

Syntax

zmq::createSubJob(addr, type, isConnnect, handle, parser, [prefix])

Details

Create a zmq subscription. The subscription will automatically reconnect after network failures.

Parameters

  • addr: the address string in the form of "protocol://interface:port", indicating the remote address to be connected to. "protocol" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "interface:port" is the remote IP address and port number.
  • type: a STRING indicating the socket type to be created. It can be "ZMQ_SUB" and "ZMQ_PULL".
  • isConnnect: a Boolean value indicating whether to connect to addr. If false the addr is binded.
  • handle: a function or a table used to handle messages sent from zmq.
  • parser: is a function for parsing subscribed messages. Currently supported functions are createJsonParser and createCsvParser. It takes a STRING scalar as input and outputs a table.
  • prefix: a STRING indicating the message prefix.

Example

handle = streamTable(10:0, [`int], [INT])
enableTableShareAndPersistence(table=handle, tableName=`test1, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=120)
parser = zmq::createJSONParser([INT], [`bool])
zmq::createSubJob("tcp://localhost:55633", "ZMQ_SUB", true, handle, parser, "prefix1")

You can use it with a Python script:

import zmq
import time
import sys

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:55633")
msg = '[{"bool":234}]'

while True:
	socket.send(msg.encode('utf-8'))
	time.sleep(2)

zmq::getSubJobStat

Syntax

zmq::getSubJobStat()

Details

Get all zmq subscription messages.

Return a table with the following columns:

  • subscriptionId: the subscription ID.
  • addr: the subscription address.
  • prefix: the message prefix.
  • recvPackets: the number of messages received.
  • createTimestamp: the timestamp when the subscription is created.

Example

handle = streamTable(10:0, [`int], [INT])
enableTableShareAndPersistence(table=handle, tableName=`test1, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=120)
parser = zmq::createJSONParser([INT], [`bool])
zmq::createSubJob("tcp://localhost:55633", "ZMQ_SUB", handle, parser, "prefix1")
zmq::getSubJobStat()

zmq::cancelSubJob

Syntax

zmq::cancelSubJob(subscription)

Parameters

  • subscription: is the value returned by createSubJob, or the subscriptionId returned by getJobStat.

Details

Cancel a zmq subscription.

Example

zmq::cancelSubJob(sub1)
zmq::cancelSubJob(42070480)

zmq::zmqCreatepusher

Syntax

zmq::zmqCreatepusher(socket, dummyTable)

Parameters

  • socket: a zmq socket.
  • dummyTable: a table object which receives the input messages.

Details

Create a zmq pusher. The plugin offers 2 ways to send messages to the pusher to forward the messages:

  • Append data to the pusher with method append!;
  • Ingest data from the output table of a streaming engine to the pusher.

Example

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])

formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632")
pusher = zmq::createPusher(socket, output1)

engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=pusher, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23)

Formatter/Parser

createCSVFormatter

Syntax

zmq::createCSVFormatter([format], [delimiter=','], [rowDelimiter=';'])

Parameters

  • format: a vector of STRING type.
  • delimiter: the delimiter between columns, the default is ','.
  • rowDelimiter: the delimiter between rows, the default is ';'.

Details

Create a Formatter function in CSV format.

Example

MyFormat = take("", 5)
MyFormat[2] = "0.000"
f = createCSVFormatter(MyFormat, ',', ';')

createCSVParser

Syntax

zmq::createCSVParser(schema, [delimiter=','], [rowDelimiter=';'])

Parameters

  • schema: a vector indicating the data type of each column.
  • delimiter: the delimiter between columns, the default is ','.
  • rowDelimiter: the delimiter between rows, the default is ';'.

Details

Create a Parser function in CSV format.

Example

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createCSVFormatter([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
s=f(t)
p = zmq::createCSVParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
p(s)

createJSONFormatter

Syntax

zmq::createJSONFormatter()

Details

Create a Formatter function in JSON format

Example

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createJSONFormatter()
f(t)

createJSONParser

Syntax

zmq::createJSONParser(schema, colNames)

Parameters

  • schema: a vector indicating the data type of each column.
  • colNames: a vector indicating the name of each column.

Details

Create a Parser function in JSON format.

Example

def createT(n) {
    return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createJSONFormatter()
p = createJSONParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL],
`bool`char`short`int`long`date`month`time`minute`second`datetime`timestamp`nanotime`nanotimestamp`float`double`string`symbol)
s=f(t)
x=p(s)

Example

loadPlugin("/home/zmx/worker/DolphinDBPlugin/zmq/cmake-build-debug/PluginZmq.txt")
go
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::bind(socket, "tcp://localhost:55632")
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
zmq::send(socket, data)

You can use it with a Python script:

import zmq
from zmq.sugar import socket
import json
if __name__=='__main__':
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    
    socket.setsockopt(zmq.TCP_KEEPALIVE, 1);
    socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 30);
    socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 1);
    socket.setsockopt(zmq.TCP_KEEPALIVE_CNT, 5);
    
    socket.connect("tcp://192.168.0.48:55632")
    zip_filter = ""
    socket.setsockopt(zmq.SUBSCRIBE, zip_filter.encode('ascii'))

    while True:
        recvStr = socket.recv()
        print (recvStr)