Kafka

Apache Kafka is a distributed messaging queue with high throughput. DolphinDB kafka plugin can be used to publish or subscribe to Kafka streaming services. The plugin supports sending and receiving data in JSON format, DolphinDB format, and string format. It also supports writing the subscribed data streams into DolphinDB through user-defined callback functions. This plugin is developed based on the open-source libraries librdkafka and cppkafka.

Installation (with installPlugin)

Required server version: DolphinDB 2.00.10 or higher

OS: Linux x86-64 and Linux ARM

Note that the kafka plugin on ARM Linux does not support SASL authentication and is not compatible with the zstd compression format when producing or consuming messages.

Installation Steps:

(1) Use listRemotePlugins to check plugin information in the plugin repository.

Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.

login("admin", "123456")
listRemotePlugins()

(2) Invoke installPlugin for plugin installation.

installPlugin("kafka")

(3) Use loadPlugin to load the plugin before using the plugin methods.

loadPlugin("kafka")

Introduction to Kafka

Before using the DolphinDB kafka plugin, here are some key concepts of Apache Kafka:

  • Cluster
    • A Kafka cluster consists of multiple brokers, each with a unique identifier in the cluster.
  • Production & Consumption
    • Producers publish data categorized by topics to the cluster.
    • Consumers subscribe to the data in the cluster.
  • Topic & Partition
    • A topic can have multiple partitions, each containing unique data in the same topic.
    • Partitions help improve the throughput of the Kafka system.
  • Consumer Group
    • Multiple consumers can be grouped into a consumer group.
    • Data from the same partition can only be consumed by one consumer in the group.
    • It is recommended to have fewer consumers subscribing to the same topic than the number of partitions to prevent consumer idling and rebalance issues.
  • Rebalance
    • When there is a change in the number of consumers in a group or the number of partitions in the subscribed topics, Kafka reassigns the mapping between consumers and partitions to ensure that all partitions are consumed.
  • Replication & Fault Tolerance
    • A partition can have multiple replicas, including a leader replica and several follower replicas.
    • If a leader fails, one of the followers will be chosen as the new leader for data processing.
    • The number of replicas cannot exceed the number of brokers.
  • Offset & Consumer Offset Committing
    • Kafka records the offsets of the consumed messages in a partition by committing them.
    • Based on the recorded offsets, the Kafka cluster continues sending data even in the case of a consumer crash or rebalance.

Method References

producer

Syntax

producer(config, [errCallback])

Details

Create a Kafka producer based on the specified configurations and return the handle.

Note that there is currently no explicit function to close the producer, including the close function in DolphinDB server. To close the producer, set it to null to trigger the destructor.

Parameters

  • config: A dictionary indicating the Kafka producer configuration, whose key is a string and value is a string or a boolean. Generally, the configuration of a producer only requires the cluster address for connection. If the cluster enables security authentication, additional configurations are required. For details, refer to Kafka Configuration.
  • errCallback (optional): A function to be called when an error or warning occurs in the background. This function accepts three STRING parameters: level ("ERROR" or "WARNING"), errorName, and message (specific error information).

Configuration Parameters

Parameter Description Example
metadata.broker.list Address(es) of the broker(s) to connect to. It is specified in the format host:port.

Single address: "localhost:9092"

Multiple addresses: "192.168.100.11:9092,192.168.100.11:9092,192.168.100.11:9092"

debug

Debug information to get.

  • Options for producer: broker, topic, msg
  • Options for consumer: consumer, cgrp, topic, fetch, all

"broker,topic": Get the debug information of the consumer’s brokers and topics.

"all": Get all the debug information.

Note that Kafka's debug information will also be output in the logs at the DEBUG level in DolphinDB.

session.timeout.ms The timeout in milliseconds used to detect client failures. The client sends periodic heartbeats to the broker. If no heartbeats are received before the expiration of this session timeout, the broker removes this client from the group and initiates a rebalance. "45000": 45-second timeout.
max.poll.interval.ms The maximum delay in milliseconds between invocations of poll() to consume messages. If this interval is exceeded, the consumer is considered failed and the group rebalances to reassign the partitions to another consumer. "3000000": 5-minute timeout.
sasl.mechanisms

The SASL mechanism used for authentication. It can be: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, and OAUTHBEARER.

Note that only one mechanism can be specified.

"PLAIN": No authentication.

"GSSAPI": Apply the GSSAPI mechanism.

security.protocol The protocol used to communicate with brokers. It can be: plaintext, ssl, sasl_plaintext, and sasl_ssl.

"plaintext": Apply the PLAINTEXT protocol.

"ssl": Apply the SSL protocol.

sasl.username The SASL username used for PLAIN and SASL-SCRAM mechanisms. "username"
sasl.password The SASL password used for PLAIN and SASL-SCRAM mechanisms. "password"
sasl.kerberos.service.name The Kerberos principal name that Kafka runs as. "kafkaAdmin"
sasl.kerberos.principal The Kerberos principal name of the broker client. "kafkaClient@EXAMPLE.COM"
sasl.kerberos.keytab The path to Kerberos keytab file. "/path_to_keytab/client.keytab"

Configuration Parameters for Kafka Producer

Parameter Description Example
compression.type The message compression type. It can be: none, gzip, snappy, lz4, and zstd. "gzip": Apply the gzip compression.
queue.buffering.max.messages The maximum number of messages allowed in the producer queue. Note that a queue can be shared by different topics and partitions. "10000000"
queue.buffering.max.kbytes The maximum total message size sum allowed in the producer queue. This parameter has higher priority than queue.buffering.max.messages. "2147483647"

Examples

  • Create a local Kafka producer:
    producerCfg = dict(STRING, ANY)
    producerCfg["metadata.broker.list"] = "localhost:9092"
    handle = kafka::producer(producerCfg)
  • Create a Kafka producer with a callback function:
    producerCfg = dict(string, any)
    producerCfg["bootstrap.servers"] = "192.168.100.3"
    share table(1:0, `level`err`reason, [STRING,STRING,STRING]) as produceDest
    def producerCallback(mutable dest, level, err, reason) {
        tb = table([level] as level, [err] as err, [reason] as reason)
        dest.append!(tb)
    }
    producer = kafka::producer(producerCfg, producerCallback{produceDest})

produce

Syntax

produce(producer, topic, key, value, marshalType, [partition])

Details

Send messages to the specified Kafka topic using the given format. For detailed usage and consumer integration, refer to Usage Examples.

Note that:

  • When sending tabular data in JSON format, the data in a column will be written into an array.
  • Data serialized in DolphinDB format can only be received using the DolphinDB kafka plugin with marshalType specified as "DOLPHINDB"; otherwise, the binary serialized data cannot be processed, which may lead to garbled data.

Parameters

  • producer: A Kafka producer handle.
  • topic: A STRING scalar indicating the Kafka topic to send messages to.
  • key: A STRING scalar indicating the key of the Kafka message. Messages with the same key will be sent to the same partition.
  • value: A STRING scalar indicating the data of the Kafka message.
  • marshalType (optional): A STRING scalar specifying the serialization format. It can be:
    • "JSON": Serialization in JSON format.
    • "DOLPHINDB": Serialization in DolphinDB internal format.
    • "PLAIN": Plain data with no serialization. This option only supports sending strings.
  • partition (optional): An integer indicating the partition number of the Kafka topic to send messages to. If unspecified, the cluster will evenly distribute the messages based on key.

producerFlush

Syntax

producerFlush(producer)

Parameters

  • producer: A Kafka producer handle.

Details

Flush all the records cached on the producer to Kafka. This function can be called when there are backlogs in the data to be sent.

consumer

Syntax

consumer(config, [errCallback])

Details

Create a Kafka consumer based on the specified configurations and return the handle.

Note that there is currently no explicit function to close the consumer, including the close function in DolphinDB server. To close the consumer, set it to null to trigger the destructor. If the consumer to be closed is used in createSubJob, the corresponding background job also needs to be canceled.

Parameters

  • config: A dictionary indicating the Kafka consumer configuration, whose key is a string and value is a string or a boolean. Parameters metadata.broker.list and group.id are required for consumer configuration. For descriptions of general configuration parameters, refer to producer.
  • errCallback (optional): A function to be called when an error or warning occurs in the background. This function accepts three STRING parameters: level ("ERROR" or "WARNING"), errorName, and message (specific error information).

Configuration Parameters for Kafka Consumer

Parameter Description Example
group.id (Required) The name of the group that the consumer belongs to. "group1"
auto.offset.reset Action to take when there is no initial offset in Kafka or the desired offset is out of range. It can be: smallest, earliest, beginning, largest, latest, end, and error.

"beginning"/"smallest"/"earliest": Reset the offset to the earliest offset.

"latest"/"largest": Reset the offset to the latest offset.

enable.auto.commit Whether to automatically commit offsets in the background. The default value is true. false: The consumer's offset will not be automatically committed in the background.
auto.commit.interval.ms The frequency in milliseconds that the consumer offsets are automatically committed. The default value is 60000. "100": The consumer's offset will be automatically committed every 100 ms.

Examples

Create a local Kafka consumer and specify group.id as "group1":

consumerCfg = dict(STRING, ANY)
consumerCfg["group.id"] = "group1"
consumerCfg["metadata.broker.list"] = "localhost:9092";
consumer = kafka::consumer(consumerCfg)

subscribe

Syntax

subscribe(consumer, topics)

Details

Subscribe to a Kafka topic and return no value.

Note that:

  • The number of consumers in the same group cannot exceed the number of partitions in the topic. Otherwise, unassigned consumers will be unable to participate in the rebalance, even if assigned consumers exit.

  • A failed subscription or a subscription to a non-existent topic returns no information or log. To access the subscription status, use the getAssignment method.

Parameters

  • consumer: A Kafka consumer handle.
  • topics: A STRING vector indicating the topics to subscribe to.

consumerPoll

Syntax

consumerPoll(consumer, [timeout=1000], [marshalType])

Details

Save the subscribed data to DolphinDB and return a tuple.

The first element is a string indicating the error message, which will be empty if consumerPoll succeeds. The second element is a tuple including the following elements: topic, partition, key, value, and the timestamp when the consumer received the data. For detailed usage and consumer integration, refer to Usage Examples.

Parameters

  • consumer: A Kafka consumer handle.
  • timeout (optional): The maximum amount of time in milliseconds to wait for a polling. The default value is 1000 milliseconds.
  • marshalType (optional): A STRING scalar specifying the parsing format. If unspecified, consumerPoll detects the subscribed data and parses it in JSON or DOLPHINDB format based on the detection results. It can be:
    • "JSON": Parse in JSON format.
    • "DOLPHINDB": Parse in DolphinDB internal format.
    • "PLAIN": Unparsed plain data.

consumerPollBatch

Syntax

consumerPollBatch(consumer, batchSize, [timeout=1000], [marshalType])

Details

Save the subscribed data to DolphinDB in batch and return a tuple.

Each element of the output is a tuple including the following elements: topic, partition, key, value, and the timestamp when the consumer received the data. For detailed usage and consumer integration, refer to Usage Examples.

Parameters

  • consumer: A Kafka consumer handle.
  • batch_size: An integer indicating the number of messages you want to get.
  • timeout (optional): An integer indicating the maximum amount of time in milliseconds to get messages.
  • marshalType (optional): A STRING scalar specifying the parsing format. If unspecified, consumerPollBatch detects the subscribed data and parses it in JSON or DolphinDB format based on the detection results. It can be:
    • "JSON": Parse in JSON format.
    • "DOLPHINDB": Parse in DolphinDB internal format.
    • "PLAIN": Unparsed plain data.

createSubJob

Syntax

createSubJob(consumer, table, parser, actionName, [throttle=1.0], [autoCommit], [msgAsTable=false], [batchSize=1], [queueDepth=1000000])

Details

Return a subJob connection handle. For detailed usage, refer to Usage Examples.

Parameters

  • consumer: A Kafka consumer handle.
  • table: A table to store the subscribed messages. It can be a shared stream table, shared in-memory table, shared keyed table, shared indexed table, or DFS table.
  • parser: A unary, binary, or ternary function to process the subscribed messages and insert the result into the table parameter.
    • If msgAsTable is false, parser accepts 1-3 STRING parameters as the input: the first is the message value, the second the key, and the third the topic.
    • If msgAsTable is true, parser accepts a table with three STRING type columns: payload, key, and topic.
  • actionName: A string indicating the name of the subscription job, which must not duplicate existing names.
  • throttle (optional): A floating-point number in seconds indicating how long to wait before the parser processes the incoming messages if the batchSize condition has not been reached. The default value is 1.0. It is only effective when msgAsTable is true.
  • autoCommit (optional): A boolean indicating whether to perform an automatic commit after the parser processes the messages and inserts the result into table.
    • If enable.auto.commit is false for the specified consumer, setting autoCommit to true can prevent data loss cause by system crashes, as the uncommitted data will be re-received upon system recovery.
    • If enable.auto.commit is false while autoCommit is unspecified or set to true, automatic commits will be performed after the messages have been processed.
  • msgAsTable (optional): A boolean indicating whether the subscribed data is ingested into parser as a table. The default value is false.
  • batchSize (optional): An integer indicating the number of unprocessed messages to trigger the parser. The default value is 0. It is only effective when msgAsTable is true.
    • If it is positive, messages will be processed when the number of unprocessed messages reaches batchSize.
    • If it is unspecified or non-positive, messages will be processed immediately as they come in.
  • queueDepth (optional): A positive integer indicating the depth of the receiving queue in Kafka’s background. The default value is 1000000.

getJobStat

Syntax

getJobStat()

Details

Get the statuses of the subscription jobs created by createSubJob and return a table.

Field Type Description
subscriptionId STRING ID of the subscription job
user STRING User name
actionName STRING Name of the subscription job
createTimestamp TIMESTAMP Creation timestamp of the subscription job
processedMsgCount LONG Number of the messages that have been processed
failedMsgCount LONG Number of the messages that failed to be processed
lastErrMsg STRING Content of the last error message
lastFailedTimestamp TIMESTAMP Timestamp when the last error message is sent
msgAsTable BOOL Whether the subscribed data is ingested into parser as a table
batchSize LONG The number of unprocessed messages to trigger the parser
throttle FLOAT How long to wait before the parser processes the incoming messages if the batchSize condition has not been reached
autoCommit BOOL Whether to perform an automatic commit after the parser processes the messages and inserts the result into table

cancelSubJob

Syntax

cancelSubJob(handle|actionName)

Details

Cancel the specified subscription job in the background. The canceled job can no longer be retrieved by getJobStat.

Note: To maintain compatibility, this method also accepts the subscriptionId returned by getJobStat. If actionName and subscriptionId share the same name, the subscription job specified by actionName will be canceled preferentially.

Parameters

  • handle|actionName: The result of createSubJobor the actionName specified by it.

getSubJobConsumer

Syntax

getSubJobConsumer(handle|actionName)

Details

Get the consumer of the specified subscription job in the background created by createSubJob.

Parameters

  • handle|actionName: The result of createSubJobor the actionName specified by it.

commit

Syntax

commit(consumer, [topics], [partitions], [offsets])

Details

Synchronously commit the offset of the latest consumed messages to the Kafka cluster.

Parameters

  • consumer: A Kafka consumer handle.
  • topics (optional): A STRING scalar or vector indicating the subscribed topic(s).
  • partitions (optional): An INT scalar or vector indicating the subscribed partition(s).
  • offsets (optional): An INT scalar or vector indicating the offset(s) for each topic.

Note:

  • topics, partitions, and offsets must have the same format and length, with their elements corresponding to each other.
  • If they are specified, only specified offsets will be commited.
  • If unspecified, the latest consumption offsets for all subscribed partitions will be committed.

assign

Syntax

assign(consumer, topics, partitions, offsets)

Details

Unlike the method subscribe, assign enables you to assign specific topics, partitions, and offsets to the consumer.

Parameters

  • consumer: A Kafka consumer handle.
  • topics: A STRING scalar or vector indicating the subscribed topic(s).
  • partitions: An INT scalar or vector indicating the subscribed partition(s).
  • offsets: An INT scalar or vector indicating the offset(s) for each topic.

Note that:

  • topics, partitions, and offsets must have the same format and length, with their elements corresponding to each other.
  • The data in the same partition of a topic can be consumed by multiple consumers using assign.
  • To avoid undefined behaviors, it is recommended not to apply subscribe and assign to the same consumer or execute these methods multiple times.

Examples

Assign the messages in the partition "1" of the "test" topic with the offset 3000 to the consumer.

kafka::assign(consumer, "test", 1, 3000)

unassign

Syntax

unassign(consumer)

Details

Cancel the specified consumer’s subscription to all topics. This method applies to subscriptions created by both subscribe and assign.

Parameters

  • consumer: A Kafka consumer handle.

getAssignment

Syntax

getAssignment(consumer)

Details

Get the assignment information of the specified consumer. Return a table with 3 columns: topic, partition, and offset.

Note: If the consumer only calls subscribe and does not call assign, this method can also return the specific assignment information.

Parameters

  • consumer: A Kafka consumer handle.

getOffsetInfo

Syntax

getOffsetInfo(consumer, topic, partition)

Details

Get the offset information about the specified topics and partitions. Return a table with the following fields:

Field Type Description
topic STRING Topic name
partition INT Partition number
minOffset LONG Minimum offset of the partition
maxOffset LONG Maximum offset of the partition
offsetPosition LONG Latest offset (which may not have been committed yet)
offsetCommitted LONG Committed offsets

Parameters

  • consumer: A Kafka consumer handle.
  • topics: A STRING scalar or vector indicating the subscribed topic(s).
  • partitions: An INT scalar or vector indicating the subscribed partition(s).

Note that:

  • topics and partitions must have the same format and length, with their elements corresponding to each other.
  • The specified topics and partitions must be subscribed by the consumer.

Examples

The following example demonstrates the differences between getAssignment and getOffsetInfo.

  • Send 10 messages to the topic "offsetTopic".
    producerCfg = dict(string, any);
    producerCfg["bootstrap.servers"] = "localhost"
    producer = kafka::producer(producerCfg)
    for (i in 0..9) {
        kafka::produce(producer, "offsetTopic", "0", "message", "PLAIN")
    }
  • Create a consumer, which consumes the subscribed data twice.
    consumerCfg = dict(string, any);
    consumerCfg["metadata.broker.list"] = "localhost"
    consumerCfg["enable.auto.commit"] = "false"
    consumerCfg["group.id"] = "test"
    consumer = kafka::consumer(consumerCfg)
    kafka::assign(consumer, "offsetTopic", 0, 3)
    kafka::consumerPoll(consumer,,"PLAIN")
    kafka::consumerPoll(consumer,,"PLAIN")
  • Apply getAssignment to get the assignment information about the consumer.
    kafka::getAssignment(consumer)

Output:

topic partition offset
offsetTopic 0 -1,000

In the output, offset is -1,000, indicating that no specific offset is specified.

  • Apply getOffsetInfo to get the offset information about the partition 0 in "offsetTopic".
    kafka::getOffsetInfo(consumer, "offsetTopic", 0)

    Output:

    topic partition minOffset maxOffset offsetPosition offsetCommitted
    offsetTopic 0 0 10 2 -1,001

    maxOffset is 10, indicating that 10 messages are sent. offsetPosition is 2, indicating that the consumer consumes 2 messages. Since enable.auto.commit is set to false, offsetCommitted is an invalid value, indicating that no offsets have been committed.

  • Apply getOffsetInfo again after committing the offsets using commit.
    kafka::commit(consumer)
    kafka::getOffsetInfo(consumer, "offsetTopic", 0)

    Output:

    topic partition minOffset maxOffset offsetPosition offsetCommitted
    offsetTopic 0 0 10 2 2

    offsetCommitted becomes 2, indicating that 2 offsets have been committed.

getMemId

Syntax

getMemId(consumer)

Details

Each consumer has a unique ID. This method can be used to get the ID of the specified consumer and return a string.

Parameters

  • consumer: A Kafka consumer handle.

Examples

consumerCfg = dict(STRING, ANY)
consumerCfg["group.id"] = string(now())
consumerCfg["metadata.broker.list"] = "localhost:9092";
consumer = kafka::consumer(consumerCfg)
kafka::subscribe(consumer, "test");
kafka::consumerPoll(consumer); // Without this step, the consumer may not be assigned an ID in the Kafka cluster
kafka::getMemId(consumer)
// output: 'rdkafka-d9eded44-358f-49fc-be01-cc099b121d59'

getMetadata

Syntax

getMetadata(handle|conf)

Details

Get the metadata of the specified Kafka cluster. Return a dictionary with the following key-value pairs:

Key Value Value Form
brokers Information about each node in the Kafka cluster

A table with three columns:

  • id: Node ID
  • host: Domain name
  • port: Port number
consumerGroups Information about the consumer group A dictionary where each key corresponds to a consumer group. Each value is also a dictionary containing the metadata and information about consumer group members.
topics Information about the topics in the cluster A dictionary where each key corresponds to a topic. Each value is a table containing information about the partitions in the topic.

Parameters

  • handle|conf: It can be the handle of an existing producer or consumer, a dictionary used to establish a connection, or the IP address and port number of a Kafka cluster.

Examples

Get the metadata of the Kafka cluster with the IP address and port number "localhost:9092".

kafka::getMetadata("localhost:9092")

Get the metadata of the Kafka cluster corresponding to the specified consumer handle.

kafka::getMetadata(consumer)

Output:

brokers->
  id host           port
  -- -------------- ----
  2  192.168.100.45 9092
  1  192.168.100.44 9092
  0  192.168.100.43 9092
consumerGroup->
  test->
    state->Stable
    protocolType->consumer
    protocol->range
    error->Success
    members->
      rdkafka-0b501728-5344-4f2f-a450-bc056f6e3400->
        clientID->rdkafka
        clientHost->/192.168.0.38
        memberAssignmentVersion->0
        partitions->
          topic partition offset
          ----- --------- ------
          test  0         -1001 
          test  1         -1001 
          test  2         -1001 
topics->
  test->
    id error   leader replicas
    -- ------- ------ --------
    0  Success 0      [0]     
    1  Success 1      [1]     
    2  Success 2      [2] 

Usage Examples

  1. Create a producer to send data in various formats and create a consumer to consume the data. Ensure that you have created a Kafka topic named "msgTopic".
    // Create a producer
    producerCfg = dict(string, any);
    producerCfg["bootstrap.servers"] = "localhost"
    producer = kafka::producer(producerCfg)
    // Create a consumer
    consumerCfg = dict(STRING, ANY)
    consumerCfg["group.id"] = string(now())
    consumerCfg["metadata.broker.list"] = "localhost:9092";
    consumer = kafka::consumer(consumerCfg)
    kafka::subscribe(consumer, "msgTopic");
    • Send data in PLAIN format
      // Convert a table to JSON string and send it in PLAIN format
      str = toStdJson(table([1,2,3] as c1, `a`b`c as c2))
      kafka::produce(producer, "msgTopic", "key", str, "PLAIN", 0)
      // Output in Kafka: [{"c1": 1,"c2": "a"},{"c1": 2,"c2": "b"},{"c1": 3,"c2": "c"}]
      // Parse the subscribed data in PLAIN format for consumption
      kafka::consumerPoll(consumer,,"PLAIN")
      // output: (,("msgTopic","key","[{\"c1\": 1,\"c2\": \"a\"},{\"c1\": 2,\"c2\": \"b\"},{\"c1\": 3,\"c2\": \"c\"}]",0,2024.09.02T03:18:02.446))
    • Send data in JSON format
      // Send a scalar
      kafka::produce(producer, "msgTopic", "key", 1, "JSON", 0)
      // Output in Kafka:[1]
      // Note that the result is output in a list, even a scalar.
      // Send a vector
      kafka::produce(producer, "msgTopic", "key", [1,2,3], "JSON", 0)
      // Output in Kafka:[1,2,3]
      // Send a table
      kafka::produce(producer, "msgTopic", "key", table([1,2,3] as c1, `a`b`c as c2), "JSON", 0)
      // Output in Kafka:{"c1":[1,2,3],"c2":["a","b","c"]}
      // Note that the plugin sends data in columnar format
      // Parse the subscribed data in PLAIN format for batch consumption
      kafka::consumerPollBatch(consumer, 3,100,"JSON")
      /* 
      Output an ANY vector where each element is also an ANY vector
      ((,("msgTopic","key",(1),0,2024.09.02T03:21:41.925)),
       (,("msgTopic","key",(1,2,3),0,2024.09.02T03:21:42.376)),
       (,("msgTopic","key",c1->(1,2,3) c2->("a","b","c"),0,2024.09.02T03:21:42.836)))
      */
    • Send data in DOLPHINDB format
      // Send a table
      kafka::produce(producer, "msgTopic", "key", table([1,2,3] as c1, `a`b`c as c2), "DOLPHINDB", 0)
      // Output in Kafka: c1c2abc
      // Note that the data in DolphinDB serialization format may contain binary characters
      // Parse the subscribed data in DOLPHINDB format for batch consumption
      kafka::consumerPoll(consumer,100,"DOLPHINDB")
      /* 
      output:
      (,("msgTopic","key",
        c1 c2
        -- --
        1  a 
        2  b 
        3  c 
      ,0,2024.09.02T03:31:09.886))
      */
  2. Use createSubJob, cancelSubJob, and getJobStat to write the subscribed data in JSON format into a stream table in the background.
    • Use parseJsonTable to parse non-recursive JSON data. Ensure that you have created a Kafka topic named "msgTopic1".
      consumerCfg = dict(STRING, ANY)
      consumerCfg["group.id"] = "subjob1"
      consumerCfg["metadata.broker.list"] = "localhost";
      consumer1 = kafka::consumer(consumerCfg)
      kafka::subscribe(consumer1, "msgTopic1");
      // Create a shared stream table for the output
      share streamTable(1:0, `date`id`msg, [DATE,INT,STRING]) as st1
      def parser1(msg) {
          // Set msgAsTable to true so that the subscribed data is ingested into parser as a table
          // Pass the payload column to parseJsonTable with the specified schema to get the parsed JSON data
          return parseJsonTable(msg.payload, table(`date`id`msg as name, `DATE`INT`STRING as type))
      }
      // Set msgAsTable to true, throttle to 0.1, and batchSize to 1000 to create subjob1 in the background
      kafka::createSubJob(consumer1, st1, parser1, "subJob1", 0.1, false, true, 1000)
      // Send the JSON data
      producerCfg = dict(string, any);
      producerCfg["bootstrap.servers"] = "localhost"
      producer = kafka::producer(producerCfg)
      /* Expected JSON formatted message:
         {"msg":"kafka",
          "id":1,
          "date":"2024.08.30"}
      */
      kafka::produce(producer, "msgTopic1", "key", {date:2024.08.30, id:1, msg:"kafka"}, "JSON")
      kafka::produce(producer, "msgTopic1", "key", {date:2024.08.30, id:2, msg:"zookeeper"}, "JSON")
      kafka::produce(producer, "msgTopic1", "key", {date:2024.08.30, id:3, msg:"raft"}, "JSON")
      // View the content of the stream table
      select * from st1

      Output:

      date id msg
      2024.08.30 1 kafka
      2024.08.30 2 zookeeper
      2024.08.30 3 raft
    • Use eval and parseExpr to parse the recursive JSON data. Ensure that you have created a Kafka topic named "msgTopic2".
      consumerCfg["group.id"] = "subjob2"
      consumer2 = kafka::consumer(consumerCfg)
      kafka::subscribe(consumer2, "msgTopic2");
      share streamTable(1:0, `topic`date`id`msg, [STRING,DATE,INT,STRING]) as st2
      def parser2(payload, key, topic) {
          // Set msgAsTable to false so that the parser is triggered for each message
          ret = payload.parseExpr().eval()
          idVec = []$INT
          msgVec = []$STRING
          for (record in ret.record) {
              idVec.append!(record.id)
              msgVec.append!(record.msg)
          }
          count = ret.record.size()
          return table(take(topic, count) as topic, take(date(ret.date), count) as date, idVec as id, msgVec as msg)
      }
      kafka::createSubJob(consumer2, st2, parser2, "subjob2")
      /* 
      Send the nested JSON data:
       {"record":[{"msg":"buy","id":1},
                  {"msg":"sell","id":2},
                  {"msg":"sell","id":3},
                  {"msg":"withdraw","id":4}],
        "date":"2024.09.01"}
      */
      kafka::produce(producer, "msgTopic2", "key", {date:2024.08.30, record:[{id:1, msg:"buy"}, {id:2, msg:"withdraw"}]}, "JSON")
      kafka::produce(producer, "msgTopic2", "key", {date:2024.08.31, record:[{id:1, msg:"sell"}]}, "JSON")
      kafka::produce(producer, "msgTopic2", "key", {date:2024.09.01,  record:[{id:1, msg:"buy"}, {id:2, msg:"sell"}, {id:3, msg:"sell"}, {id:4, msg:"withdraw"}]}, "JSON")
      // View the content of the stream table
      select * from st2

      Output:

      topic date id msg
      msgTopic2 2024.08.30 1 buy
      msgTopic2 2024.08.30 2 withdraw
      msgTopic2 2024.08.31 1 sell
      msgTopic2 2024.09.01 1 buy
      msgTopic2 2024.09.01 2 sell
      msgTopic2 2024.09.01 3 sell
      msgTopic2 2024.09.01 4 withdraw
    • Get the statuses of the subscription jobs in the background.
      kafka::getJobStat()
      subscriptionId user actionName createTimestamp processedMsgCount failedMsgCount lastErrMsg lastFailedTimestamp msgAsTable batchSize throttle autoCommit
      83095808 admin subJob1 2024.09.02 14:52:01.028 3 0 true 1,000 0.10000000149011612 false
      86054464 admin subjob2 2024.09.02 14:52:13.382 3 0 false 0 1 false
    • Cancel all subscription jobs in the background through action names.
      stat = kafka::getJobStat()
      for (action in stat.actionName) {
          kafka::cancelSubJob(action)
      }
  3. Connect to Kafka with SASL authentication

    Producers and consumers require the same additional configuration parameters for connection. For details on configuration parameters, refer to the section producer.

    • Password authentication
      consumerCfg = dict(string, any);
      consumerCfg["metadata.broker.list"] = "localhost";
      consumerCfg["group.id"] = "test";
      consumerCfg["sasl.mechanisms"] = "PLAIN";
      consumerCfg["security.protocol"] = "sasl_plaintext";
      consumerCfg["sasl.username"] = "username";
      consumerCfg["sasl.password"] = "password";
      consumer = kafka::consumer(consumerCfg);
      topics=["test"];
      kafka::subscribe(consumer, topics);
      kafka::consumerPoll(consumer);
    • Kerberos authentication

      Kerberos authentication requires a Kerberos client installed on DolphinDB server and a validated keytab file. Configuring the Kerberos client can be challenging, so it is recommended to use a Kafka cluster without Kerberos authentication.

      producerCfg=dict(STRING, ANY)
      producerCfg["metadata.broker.list"] = "aftersale2:9094"
      producerCfg["sasl.mechanisms"] = "GSSAPI"
      producerCfg["security.protocol"] = "sasl_plaintext";
      producerCfg["sasl.kerberos.service.name"] = "kafkaAdmin";
      producerCfg["sasl.kerberos.principal"] = "kafkaclient@EXAMPLE.COM";
      producerCfg["sasl.kerberos.keytab"] = "/path_to_kerberos/kerberos/kafkaclient.keytab";
      producer = kafka::producer(producerCfg)
  4. Connect to Kafka with SSL authentication

    Connect to a Kafka broker with SSL authentication. To generate the SSL key and certificate for Kafka brokers, see Using SSL with librdkafka.

    ca_path = "/home/slshen/opt/kafka_ssl_rdkafka/ca"
    producerCfg = dict(string, any);
    producerCfg["bootstrap.servers"] = "host:port"
    producerCfg["security.protocol"] = "ssl";
    producerCfg["ssl.ca.location"] = ca_path + "/rdkafka/ca-cert"
    producerCfg["ssl.certificate.location"] = ca_path + "/rdkafka/client_c1_client.pem"
    producerCfg["ssl.key.location"] = ca_path + "/rdkafka/client_c1_client.key"
    producerCfg["ssl.key.password"] = "key.password"
    producer = kafka::producer(producerCfg)

FAQ

Q1: How to troubleshoot thread parsing issues in the background?

Use the getJobStat method to check for parsing failures and monitor the number of messages that have been or failed to be processed.

Q2: Why are there illegal characters at the beginning of the data sent by the plugin?

If the marshalType parameter of produce is set to "DOLPHINDB", messages will be sent in binary format. If you directly view these messages as strings, they may appear as garbled text.

Q3: Where are the error logs of Kafka?

Check the DolphinDB server log for entries starting with [PLUGIN::KAFKA] to find error messages recorded in the background.

Q4: What is the difference between assign and subscribe?

With assign, consumers can consume the messages starting from a given offset in the specified topics and partitions.

With subscribe, consumers can subscribe to a Kafka topic. Messages subscribed by different consumers will not be duplicated since data from the same partition can only be consumed by one consumer in the group,

Q5: How to manually control message commits?

Set the consumer configuration parameter enable.auto.commit to false to avoid automatic commits. Then, call commit to manually commit messages.

Q6: What causes the error Local: Queue full when calling produce?

This error occurs when the sending speed is too fast. You can modify the parameters queue.buffering.max.messages and queue.buffering.max.kbytes based on the actual situation. For detailed usage, refer to the section producer.

Q7: Why the first message received in the background is Application maximum poll interval (3000000 ms) exceeded by ...?

The interval between two polls is too long, possibly due to a delay in performing a poll operation after establishing the consumer.

Q8: How to troubleshoot connection failures to Kafka with Kerberos authentication?

In this case, troubleshooting primarily relies on log information. The authentication is done in the background. Only generic errors are displayed in front-end, such as broker transport failure. Configuring the Kerberos client can be challenging, so it is recommended for experienced users.

Common causes of connection failures and the log examples:

  • If “kafka” is not added to /etc/hosts on the client host, the client is unable to resolve and reach the advertised hostnames.

    2024-12-09 06:54:02.445669000,2fea <ERROR> :[PLUGIN::KAFKA] facility: FAIL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: Failed to resolve 'aftersale2:9094': Name or service not known (after 12ms in state CONNECT)
  • The necessary Kerberos libraries are not installed in the environment. Front-end error: sh: kinit: command not found.

    :[PLUGIN::KAFKA] facility: SASLREFRESH, message: [thrd:main]: Kerberos ticket refresh failed: kinit -R -t "/path_to_kerberos/kerberos/kafkaclient.keytab" -k kafkaclient@EXAMPLE.COM || kinit -t "/path_to_kerberos/kerberos/kafkaclient.keytab" -k kafkaclient@EXAMPLE.COM: exited with code 127
    2024-12-09 07:02:08.319563000,cea <ERROR> :[PLUGIN::KAFKA] facility: LIBSASL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: Cyrus/libsasl2 is missing a GSSAPI module: make sure the libsasl2-modules-gssapi-mit or cyrus-sasl-gssapi packages are installed
    2024-12-09 07:02:08.319742000,cea <ERROR> :[PLUGIN::KAFKA] facility: FAIL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): no mechanism available: No worthy mechs found (after 0ms in state AUTH_REQ)
  • Incorrect configuration of the krb5.conf file. Front-end error: kinit: Invalid UID in persistent keyring name while getting default ccache.

    2024-12-09 07:24:26.230664000,79b6 <ERROR> :[PLUGIN::KAFKA] facility: LIBSASL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: GSSAPI Error: Unspecified GSS failure.  Minor code may provide more information (No Kerberos credentials available: Invalid UID in persistent keyring name)
    2024-12-09 07:24:26.230852000,79b6 <ERROR> :[PLUGIN::KAFKA] facility: FAIL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-1)): SASL(-1): generic failure: GSSAPI Error: Unspecified GSS failure.  Minor code may provide more information (No Kerberos credentials available: Invalid UID in persistent keyring name) (after 1ms in state AUTH_REQ)
  • The Kerberos principal is not configured on the Kafka server.

    2024-12-09 08:07:05.294237000,24e8 <ERROR> :[PLUGIN::KAFKA] facility: LIBSASL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: GSSAPI Error: Unspecified GSS failure.  Minor code may provide more information (Server kafkaClient/aftersale2@EXAMPLE.COM not found in Kerberos database)
    Use the kadmin list_principals command to generate a listing of principals. Check whether kafkaClient/aftersale2@EXAMPLE.COM exists or not.