RocketMQ

The DolphinDB RocketMQ plugin supports sending data messages to RocketMQ clusters and receiving data from topics in RocketMQ clusters.

Installation (with installPlugin)

Required server version: DolphinDB 2.00.10 or higher.

Supported OS: Linux x64.

Installation Steps:

(1) Use listRemotePlugins to check plugin information in the plugin repository.

Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.

login("admin", "123456")
listRemotePlugins(, "http://plugins.dolphindb.com/plugins/")

(2) Invoke installPlugin for plugin installation

installPlugin("RocketMQ")

(3) Use loadPlugin to load the plugin before using the plugin methods.

loadPlugin("RocketMQ")

Method References

createProducer

Syntax

createProducer(namesrv, groupName, [producerConfig])

Details

The method creates a RocketMQ producer to send data to RocketMQ through send method. It returns a RocketMQ producer object.

Parameters

  • namesrv: A STRING scalar indicating the address of the RocketMQ cluster's NameServer in "ip:port" format.
  • groupName: A STRING scalar indicating the producer group.
  • producerConfig (optional): A dictionary with STRING keys and ANY values indicating producer configurations, including:
    • sessionCredentials: A STRING vector indicating user credentials (accessKey, secretKey, accessChannel).
    • namesrvDomain: A STRING scalar for the domain to request Name Server addresses.
    • nameSpace: A STRING scalar for creating logical partitions within a RocketMQ cluster, each with its own Topic, Producer, and Consumer.
    • instanceName: A STRING scalar for uniquely identifying the RocketMQ client instance.
    • unitName: A STRING scalar identifying a specific business unit.
    • sendMsgTimeout: An INT scalar indicating single send timeout in milliseconds. The default value is 3000. If sending fails after sendMsgTimeout milliseconds, the system will attempt to resend the data until the retryTimes limit is reached.
    • retryTimes: An INT scalar indicating the number of send retries. The default value is 5.
    • compressMsgBodyOverHowmuch: An INT scalar for message body compression threshold in bytes. Must be greater than 0. The default value is 4096.
    • compressLevel: An INT scalar indicating compression level (1-9. The default value is 4.
    • maxMessageSize: An INT scalar indicating maximum message length in bytes. Must be greater than 0. The default value is 133632.
    • tcpTransportConnectTimeout: An INT scalar indicating connection timeout in milliseconds. Must be greater than 0. The default value is 3000.

send

Syntax

send(producer, data, topic, [tag = ""])

Details

The method sends data to RocketMQ and returns a boolean value indicating whether the data was sent successfully.

Parameters

  • producer: A producer object created by createProducer.
  • data: A STRING or BLOB scalar or vector of data to be sent.
  • topic: A STRING scalar indicating the topic of the data to be sent.
  • tag (optional): A STRING scalar indicating the message tag. The default value is an empty string.

Examples

producer = RocketMQ::createProducer("192.168.1.38:9876", "group1");
msg = "msg1"
RocketMQ::send(msg)
msg2 = ["msg1", "msg2"]
RocketMQ::send(msg2)

createSubJob

Syntax

createSubJob(namesrv, groupName, topic, handler, [subExpression = "*"], [consumerConfig])

Details

The method creates a RocketMQ subscription job for receiving data in the background and returns a task ID of type STRING.

Parameters

  • namesrv: A STRING scalar indicating the address of the RocketMQ cluster's NameServer in "ip:port" format.
  • groupName: A STRING scalar indicating the consumer group.
  • topic: A STRING scalar indicating the topic.
  • handler: A unary function used to process the subscribed data. The received parameter is a table containing a BLOB type column named msg.
  • subExpression (optional): A STRING scalar used for message filtering. The default value is "*".
  • consumerConfig (optional): A dictionary with STRING keys and ANY values indicating consumer configurations, including:
    • sessionCredentials: A STRING vector indicating user credentials (accessKey, secretKey, accessChannel).
    • messageModel: A STRING scalar indicating the consumption model:
      • "CLUSTERING" (default): Cluster consumption.
      • "BROADCASTING": Broadcast consumption.
    • consumeFromWhere: A STRING scalar indicating where the consumer starts consuming data:
      • "CONSUME_FROM_LAST_OFFSET"(default): Initially begins consumption from the end of the queue. On subsequent runs, consumption resumes from the last consumed position.
      • "CONSUME_FROM_FIRST_OFFSET": Initially begins consumption from the beginning of the queue. On subsequent runs, consumption resumes from the last consumed position.
    • namesrvDomain: A STRING scalar for the domain to request Name Server addresses.
    • instanceName: A STRING scalar for uniquely identifying the RocketMQ client instance.
    • unitName: unitName: A STRING scalar identifying a specific business unit.
    • nameSpace: A STRING scalar for creating logical partitions within a RocketMQ cluster, each with its own topic, producer, and consumer.
    • consumeThreadCount: An INT scalar indiacting the number of consumer threads. The default value is 1.
    • pullMsgThreadPoolCount: An INT scalar indiacting the number of threads for pulling RocketMQ data. Must be greater than 0. The default value is 1.
    • maxReconsumeTimes: An INT scalar indicating the maximum number of retry attempts when message consumption fails. Must be greater than 0. If not set, default is no retries. Each retry places the message at the end of the queue, waiting for other consumers.
    • tcpTransportConnectTimeout: An INT scalar indicating the connection timeout in milliseconds. Must be greater than 0. The default value is 3000.
    • asyncPull: A BOOL scalar indicating whether to execute the handler asynchronously after receiving data. The default value is false, ensuring no data loss.
    • batchSize: An INT scalar indicating the number of unprocessed messages before the handler processes them. Must be greater than 0. The default value is 10000. Effective only when asyncPull is true.
    • throttle: A FLOAT scalar indicating the time in milliseconds to wait after the last *handler *execution if batchSize is not met. Must be greater than 0. The default value is 1. It only takes effect when asyncPull is true.

Examples

def appendData(table1, data){
  table1.append!(data)
}
table1 = table(1:0, ["msg"], [BLOB])
RocketMQ::createSubJob("192.168.1.38:9876", "group1", ”topic1", appendData{table1})

cancelSubJob

Syntax

cancelSubJob(JobId)

Details

The method cancels a background subscription job to RocketMQ and returns a boolean value indicating whether the cancellation was successful.

Parameters

  • JobId: A STRING scalar indicating the subscription task ID returned by createSubJob or getSubJobStat.

getSubJobStat

Syntax

getSubJobStat()

Details

The method queries the current RocketMQ background subscription job information. It returns a table with following columns:

  • jobID: STRING type, indicating the subscription ID.
  • startTime: NANOTIMESTAMP type, indicating the task creation time.
  • endTime: NANOTIMESTAMP type, indicating the end time of the task.
  • firstMsgTime: NANOTIMESTAMP type, indicating the receive time of the first data message.
  • lastMsgTime: NANOTIMESTAMP type, indicating the receive time of the last message.
  • processedMsgCount: LONG type, indicating the number of successfully processed message rows.
  • failedMsgCount: LONG type, indicating the number of failed message rows.
  • lastErrMsg: STRING type, indicating the error message from the last processing failure.
  • lastFailedTimestamp: NANOTIMESTAMP type, indicating the time of the last processing failure.