Kafka
Apache Kafka is a distributed messaging queue with high throughput. DolphinDB kafka plugin can be used to publish or subscribe to Kafka streaming services. The plugin supports sending and receiving data in JSON format, DolphinDB format, and string format. It also supports writing the subscribed data streams into DolphinDB through user-defined callback functions. This plugin is developed based on the open-source libraries librdkafka and cppkafka.
Installation (with installPlugin
)
Required server version: DolphinDB 2.00.10 or higher
OS: Linux x86-64 and Linux ARM
Note that the kafka plugin on ARM Linux does not support SASL authentication and is not compatible with the zstd compression format when producing or consuming messages.
Installation Steps:
(1) Use listRemotePlugins to check plugin information in the plugin repository.
Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.
login("admin", "123456")
listRemotePlugins()
(2) Invoke installPlugin for plugin installation.
installPlugin("kafka")
(3) Use loadPlugin to load the plugin before using the plugin methods.
loadPlugin("kafka")
Introduction to Kafka
Before using the DolphinDB kafka plugin, here are some key concepts of Apache Kafka:
- Cluster
- A Kafka cluster consists of multiple brokers, each with a unique identifier in the cluster.
- Production & Consumption
- Producers publish data categorized by topics to the cluster.
- Consumers subscribe to the data in the cluster.
- Topic & Partition
- A topic can have multiple partitions, each containing unique data in the same topic.
- Partitions help improve the throughput of the Kafka system.
- Consumer Group
- Multiple consumers can be grouped into a consumer group.
- Data from the same partition can only be consumed by one consumer in the group.
- It is recommended to have fewer consumers subscribing to the same topic than the number of partitions to prevent consumer idling and rebalance issues.
- Rebalance
- When there is a change in the number of consumers in a group or the number of partitions in the subscribed topics, Kafka reassigns the mapping between consumers and partitions to ensure that all partitions are consumed.
- Replication & Fault Tolerance
- A partition can have multiple replicas, including a leader replica and several follower replicas.
- If a leader fails, one of the followers will be chosen as the new leader for data processing.
- The number of replicas cannot exceed the number of brokers.
- Offset & Consumer Offset Committing
- Kafka records the offsets of the consumed messages in a partition by committing them.
- Based on the recorded offsets, the Kafka cluster continues sending data even in the case of a consumer crash or rebalance.
Method References
producer
Syntax
producer(config, [errCallback])
Details
Create a Kafka producer based on the specified configurations and return the handle.
Note that there is currently no explicit function to close the producer,
including the close
function in DolphinDB server. To close the
producer, set it to null to trigger the destructor.
Parameters
- config: A dictionary indicating the Kafka producer configuration, whose key is a string and value is a string or a boolean. Generally, the configuration of a producer only requires the cluster address for connection. If the cluster enables security authentication, additional configurations are required. For details, refer to Kafka Configuration.
- errCallback (optional): A function to be called when an error or warning occurs in the background. This function accepts three STRING parameters: level ("ERROR" or "WARNING"), errorName, and message (specific error information).
Configuration Parameters
Parameter | Description | Example |
---|---|---|
metadata.broker.list | Address(es) of the broker(s) to connect to. It is specified in the format host:port. |
Single address: "localhost:9092" Multiple addresses: "192.168.100.11:9092,192.168.100.11:9092,192.168.100.11:9092" |
debug |
Debug information to get.
|
"broker,topic": Get the debug information of the consumer’s brokers and topics. "all": Get all the debug information. Note that Kafka's debug information will also be output in the logs at the DEBUG level in DolphinDB. |
session.timeout.ms | The timeout in milliseconds used to detect client failures. The client sends periodic heartbeats to the broker. If no heartbeats are received before the expiration of this session timeout, the broker removes this client from the group and initiates a rebalance. | "45000": 45-second timeout. |
max.poll.interval.ms | The maximum delay in milliseconds between
invocations of poll() to consume messages. If
this interval is exceeded, the consumer is considered failed and
the group rebalances to reassign the partitions to another
consumer. |
"3000000": 5-minute timeout. |
sasl.mechanisms |
The SASL mechanism used for authentication. It can be: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, and OAUTHBEARER. Note that only one mechanism can be specified. |
"PLAIN": No authentication. "GSSAPI": Apply the GSSAPI mechanism. |
security.protocol | The protocol used to communicate with brokers. It can be: plaintext, ssl, sasl_plaintext, and sasl_ssl. |
"plaintext": Apply the PLAINTEXT protocol. "ssl": Apply the SSL protocol. |
sasl.username | The SASL username used for PLAIN and SASL-SCRAM mechanisms. | "username" |
sasl.password | The SASL password used for PLAIN and SASL-SCRAM mechanisms. | "password" |
sasl.kerberos.service.name | The Kerberos principal name that Kafka runs as. | "kafkaAdmin" |
sasl.kerberos.principal | The Kerberos principal name of the broker client. | "kafkaClient@EXAMPLE.COM" |
sasl.kerberos.keytab | The path to Kerberos keytab file. | "/path_to_keytab/client.keytab" |
Configuration Parameters for Kafka Producer
Parameter | Description | Example |
---|---|---|
compression.type | The message compression type. It can be: none, gzip, snappy, lz4, and zstd. | "gzip": Apply the gzip compression. |
queue.buffering.max.messages | The maximum number of messages allowed in the producer queue. Note that a queue can be shared by different topics and partitions. | "10000000" |
queue.buffering.max.kbytes | The maximum total message size sum allowed in the producer queue. This parameter has higher priority than queue.buffering.max.messages. | "2147483647" |
Examples
- Create a local Kafka
producer:
producerCfg = dict(STRING, ANY) producerCfg["metadata.broker.list"] = "localhost:9092" handle = kafka::producer(producerCfg)
- Create a Kafka producer with a callback
function:
producerCfg = dict(string, any) producerCfg["bootstrap.servers"] = "192.168.100.3" share table(1:0, `level`err`reason, [STRING,STRING,STRING]) as produceDest def producerCallback(mutable dest, level, err, reason) { tb = table([level] as level, [err] as err, [reason] as reason) dest.append!(tb) } producer = kafka::producer(producerCfg, producerCallback{produceDest})
produce
Syntax
produce(producer, topic, key, value, marshalType, [partition])
Details
Send messages to the specified Kafka topic using the given format. For detailed usage and consumer integration, refer to Usage Examples.
Note that:
- When sending tabular data in JSON format, the data in a column will be written into an array.
- Data serialized in DolphinDB format can only be received using the DolphinDB kafka plugin with marshalType specified as "DOLPHINDB"; otherwise, the binary serialized data cannot be processed, which may lead to garbled data.
Parameters
- producer: A Kafka producer handle.
- topic: A STRING scalar indicating the Kafka topic to send messages to.
- key: A STRING scalar indicating the key of the Kafka message. Messages with the same key will be sent to the same partition.
- value: A STRING scalar indicating the data of the Kafka message.
- marshalType (optional): A STRING scalar specifying the serialization
format. It can be:
- "JSON": Serialization in JSON format.
- "DOLPHINDB": Serialization in DolphinDB internal format.
- "PLAIN": Plain data with no serialization. This option only supports sending strings.
- partition (optional): An integer indicating the partition number of the Kafka topic to send messages to. If unspecified, the cluster will evenly distribute the messages based on key.
producerFlush
Syntax
producerFlush(producer)
Parameters
- producer: A Kafka producer handle.
Details
Flush all the records cached on the producer to Kafka. This function can be called when there are backlogs in the data to be sent.
consumer
Syntax
consumer(config, [errCallback])
Details
Create a Kafka consumer based on the specified configurations and return the handle.
Note that there is currently no explicit function to close the consumer,
including the close
function in DolphinDB server. To close the
consumer, set it to null to trigger the destructor. If the consumer to be closed
is used in createSubJob
, the corresponding background job also
needs to be canceled.
Parameters
- config: A dictionary indicating the Kafka consumer configuration, whose key is a string and value is a string or a boolean. Parameters metadata.broker.list and group.id are required for consumer configuration. For descriptions of general configuration parameters, refer to producer.
- errCallback (optional): A function to be called when an error or warning occurs in the background. This function accepts three STRING parameters: level ("ERROR" or "WARNING"), errorName, and message (specific error information).
Configuration Parameters for Kafka Consumer
Parameter | Description | Example |
---|---|---|
group.id | (Required) The name of the group that the consumer belongs to. | "group1" |
auto.offset.reset | Action to take when there is no initial offset in Kafka or the desired offset is out of range. It can be: smallest, earliest, beginning, largest, latest, end, and error. |
"beginning"/"smallest"/"earliest": Reset the offset to the earliest offset. "latest"/"largest": Reset the offset to the latest offset. |
enable.auto.commit | Whether to automatically commit offsets in the background. The default value is true. | false: The consumer's offset will not be automatically committed in the background. |
auto.commit.interval.ms | The frequency in milliseconds that the consumer offsets are automatically committed. The default value is 60000. | "100": The consumer's offset will be automatically committed every 100 ms. |
Examples
Create a local Kafka consumer and specify group.id as "group1":
consumerCfg = dict(STRING, ANY)
consumerCfg["group.id"] = "group1"
consumerCfg["metadata.broker.list"] = "localhost:9092";
consumer = kafka::consumer(consumerCfg)
subscribe
Syntax
subscribe(consumer, topics)
Details
Subscribe to a Kafka topic and return no value.
Note that:
-
The number of consumers in the same group cannot exceed the number of partitions in the topic. Otherwise, unassigned consumers will be unable to participate in the rebalance, even if assigned consumers exit.
-
A failed subscription or a subscription to a non-existent topic returns no information or log. To access the subscription status, use the
getAssignment
method.
Parameters
- consumer: A Kafka consumer handle.
- topics: A STRING vector indicating the topics to subscribe to.
consumerPoll
Syntax
consumerPoll(consumer, [timeout=1000], [marshalType])
Details
Save the subscribed data to DolphinDB and return a tuple.
The first element is a string indicating the error message, which will be empty
if consumerPoll
succeeds. The second element is a tuple
including the following elements: topic, partition, key, value, and the
timestamp when the consumer received the data. For detailed usage and consumer
integration, refer to Usage Examples.
Parameters
- consumer: A Kafka consumer handle.
- timeout (optional): The maximum amount of time in milliseconds to wait for a polling. The default value is 1000 milliseconds.
- marshalType (optional): A STRING scalar specifying the parsing
format. If unspecified,
consumerPoll
detects the subscribed data and parses it in JSON or DOLPHINDB format based on the detection results. It can be:- "JSON": Parse in JSON format.
- "DOLPHINDB": Parse in DolphinDB internal format.
- "PLAIN": Unparsed plain data.
consumerPollBatch
Syntax
consumerPollBatch(consumer, batchSize, [timeout=1000], [marshalType])
Details
Save the subscribed data to DolphinDB in batch and return a tuple.
Each element of the output is a tuple including the following elements: topic, partition, key, value, and the timestamp when the consumer received the data. For detailed usage and consumer integration, refer to Usage Examples.
Parameters
- consumer: A Kafka consumer handle.
- batch_size: An integer indicating the number of messages you want to get.
- timeout (optional): An integer indicating the maximum amount of time in milliseconds to get messages.
- marshalType (optional): A STRING scalar specifying the parsing
format. If unspecified,
consumerPollBatch
detects the subscribed data and parses it in JSON or DolphinDB format based on the detection results. It can be:- "JSON": Parse in JSON format.
- "DOLPHINDB": Parse in DolphinDB internal format.
- "PLAIN": Unparsed plain data.
createSubJob
Syntax
createSubJob(consumer, table, parser, actionName, [throttle=1.0], [autoCommit], [msgAsTable=false], [batchSize=1], [queueDepth=1000000])
Details
Return a subJob connection handle. For detailed usage, refer to Usage Examples.
Parameters
- consumer: A Kafka consumer handle.
- table: A table to store the subscribed messages. It can be a shared stream table, shared in-memory table, shared keyed table, shared indexed table, or DFS table.
- parser: A unary, binary, or ternary function to process the
subscribed messages and insert the result into the table
parameter.
- If msgAsTable is false, parser accepts 1-3 STRING parameters as the input: the first is the message value, the second the key, and the third the topic.
- If msgAsTable is true, parser accepts a table with three STRING type columns: payload, key, and topic.
- actionName: A string indicating the name of the subscription job, which must not duplicate existing names.
- throttle (optional): A floating-point number in seconds indicating how long to wait before the parser processes the incoming messages if the batchSize condition has not been reached. The default value is 1.0. It is only effective when msgAsTable is true.
- autoCommit (optional): A boolean indicating whether to perform an
automatic commit after the parser processes the messages and inserts
the result into table.
- If enable.auto.commit is false for the specified consumer, setting autoCommit to true can prevent data loss cause by system crashes, as the uncommitted data will be re-received upon system recovery.
- If enable.auto.commit is false while autoCommit is unspecified or set to true, automatic commits will be performed after the messages have been processed.
- msgAsTable (optional): A boolean indicating whether the subscribed data is ingested into parser as a table. The default value is false.
- batchSize (optional): An integer indicating the number of unprocessed
messages to trigger the parser. The default value is 0. It is only
effective when msgAsTable is true.
- If it is positive, messages will be processed when the number of unprocessed messages reaches batchSize.
- If it is unspecified or non-positive, messages will be processed immediately as they come in.
- queueDepth (optional): A positive integer indicating the depth of the receiving queue in Kafka’s background. The default value is 1000000.
getJobStat
Syntax
getJobStat()
Details
Get the statuses of the subscription jobs created by
createSubJob
and return a table.
Field | Type | Description |
---|---|---|
subscriptionId | STRING | ID of the subscription job |
user | STRING | User name |
actionName | STRING | Name of the subscription job |
createTimestamp | TIMESTAMP | Creation timestamp of the subscription job |
processedMsgCount | LONG | Number of the messages that have been processed |
failedMsgCount | LONG | Number of the messages that failed to be processed |
lastErrMsg | STRING | Content of the last error message |
lastFailedTimestamp | TIMESTAMP | Timestamp when the last error message is sent |
msgAsTable | BOOL | Whether the subscribed data is ingested into parser as a table |
batchSize | LONG | The number of unprocessed messages to trigger the parser |
throttle | FLOAT | How long to wait before the parser processes the incoming messages if the batchSize condition has not been reached |
autoCommit | BOOL | Whether to perform an automatic commit after the parser processes the messages and inserts the result into table |
cancelSubJob
Syntax
cancelSubJob(handle|actionName)
Details
Cancel the specified subscription job in the background. The canceled job can no
longer be retrieved by getJobStat
.
Note: To maintain compatibility, this method also accepts the
subscriptionId returned by getJobStat
. If
actionName and subscriptionId share the same name, the
subscription job specified by actionName will be canceled
preferentially.
Parameters
- handle|actionName: The result of
createSubJob
or the actionName specified by it.
getSubJobConsumer
Syntax
getSubJobConsumer(handle|actionName)
Details
Get the consumer of the specified subscription job in the background created by
createSubJob
.
Parameters
- handle|actionName: The result of
createSubJob
or the actionName specified by it.
commit
Syntax
commit(consumer, [topics], [partitions], [offsets])
Details
Synchronously commit the offset of the latest consumed messages to the Kafka cluster.
Parameters
- consumer: A Kafka consumer handle.
- topics (optional): A STRING scalar or vector indicating the subscribed topic(s).
- partitions (optional): An INT scalar or vector indicating the subscribed partition(s).
- offsets (optional): An INT scalar or vector indicating the offset(s) for each topic.
Note:
- topics, partitions, and offsets must have the same format and length, with their elements corresponding to each other.
- If they are specified, only specified offsets will be commited.
- If unspecified, the latest consumption offsets for all subscribed partitions will be committed.
assign
Syntax
assign(consumer, topics, partitions, offsets)
Details
Unlike the method subscribe
, assign
enables you
to assign specific topics, partitions, and offsets to the consumer.
Parameters
- consumer: A Kafka consumer handle.
- topics: A STRING scalar or vector indicating the subscribed topic(s).
- partitions: An INT scalar or vector indicating the subscribed partition(s).
- offsets: An INT scalar or vector indicating the offset(s) for each topic.
Note that:
- topics, partitions, and offsets must have the same format and length, with their elements corresponding to each other.
- The data in the same partition of a topic can be consumed by multiple
consumers using
assign
. - To avoid undefined behaviors, it is recommended not to apply
subscribe
andassign
to the same consumer or execute these methods multiple times.
Examples
Assign the messages in the partition "1" of the "test" topic with the offset 3000 to the consumer.
kafka::assign(consumer, "test", 1, 3000)
unassign
Syntax
unassign(consumer)
Details
Cancel the specified consumer’s subscription to all topics. This method applies
to subscriptions created by both subscribe
and
assign
.
Parameters
- consumer: A Kafka consumer handle.
getAssignment
Syntax
getAssignment(consumer)
Details
Get the assignment information of the specified consumer. Return a table with 3 columns: topic, partition, and offset.
Note: If the consumer only calls subscribe
and does not call
assign
, this method can also return the specific assignment
information.
Parameters
- consumer: A Kafka consumer handle.
getOffsetInfo
Syntax
getOffsetInfo(consumer, topic, partition)
Details
Get the offset information about the specified topics and partitions. Return a table with the following fields:
Field | Type | Description |
---|---|---|
topic | STRING | Topic name |
partition | INT | Partition number |
minOffset | LONG | Minimum offset of the partition |
maxOffset | LONG | Maximum offset of the partition |
offsetPosition | LONG | Latest offset (which may not have been committed yet) |
offsetCommitted | LONG | Committed offsets |
Parameters
- consumer: A Kafka consumer handle.
- topics: A STRING scalar or vector indicating the subscribed topic(s).
- partitions: An INT scalar or vector indicating the subscribed partition(s).
Note that:
- topics and partitions must have the same format and length, with their elements corresponding to each other.
- The specified topics and partitions must be subscribed by the consumer.
Examples
The following example demonstrates the differences between
getAssignment
and getOffsetInfo
.
- Send 10 messages to the topic
"offsetTopic".
producerCfg = dict(string, any); producerCfg["bootstrap.servers"] = "localhost" producer = kafka::producer(producerCfg) for (i in 0..9) { kafka::produce(producer, "offsetTopic", "0", "message", "PLAIN") }
- Create a consumer, which consumes the subscribed data
twice.
consumerCfg = dict(string, any); consumerCfg["metadata.broker.list"] = "localhost" consumerCfg["enable.auto.commit"] = "false" consumerCfg["group.id"] = "test" consumer = kafka::consumer(consumerCfg) kafka::assign(consumer, "offsetTopic", 0, 3) kafka::consumerPoll(consumer,,"PLAIN") kafka::consumerPoll(consumer,,"PLAIN")
- Apply
getAssignment
to get the assignment information about the consumer.kafka::getAssignment(consumer)
Output:
topic | partition | offset |
---|---|---|
offsetTopic | 0 | -1,000 |
In the output, offset is -1,000, indicating that no specific offset is specified.
- Apply
getOffsetInfo
to get the offset information about the partition 0 in "offsetTopic".kafka::getOffsetInfo(consumer, "offsetTopic", 0)
Output:
topic partition minOffset maxOffset offsetPosition offsetCommitted offsetTopic 0 0 10 2 -1,001 maxOffset is 10, indicating that 10 messages are sent. offsetPosition is 2, indicating that the consumer consumes 2 messages. Since enable.auto.commit is set to false, offsetCommitted is an invalid value, indicating that no offsets have been committed.
- Apply
getOffsetInfo
again after committing the offsets usingcommit
.kafka::commit(consumer) kafka::getOffsetInfo(consumer, "offsetTopic", 0)
Output:
topic partition minOffset maxOffset offsetPosition offsetCommitted offsetTopic 0 0 10 2 2 offsetCommitted becomes 2, indicating that 2 offsets have been committed.
getMemId
Syntax
getMemId(consumer)
Details
Each consumer has a unique ID. This method can be used to get the ID of the specified consumer and return a string.
Parameters
- consumer: A Kafka consumer handle.
Examples
consumerCfg = dict(STRING, ANY)
consumerCfg["group.id"] = string(now())
consumerCfg["metadata.broker.list"] = "localhost:9092";
consumer = kafka::consumer(consumerCfg)
kafka::subscribe(consumer, "test");
kafka::consumerPoll(consumer); // Without this step, the consumer may not be assigned an ID in the Kafka cluster
kafka::getMemId(consumer)
// output: 'rdkafka-d9eded44-358f-49fc-be01-cc099b121d59'
getMetadata
Syntax
getMetadata(handle|conf)
Details
Get the metadata of the specified Kafka cluster. Return a dictionary with the following key-value pairs:
Key | Value | Value Form |
---|---|---|
brokers | Information about each node in the Kafka cluster |
A table with three columns:
|
consumerGroups | Information about the consumer group | A dictionary where each key corresponds to a consumer group. Each value is also a dictionary containing the metadata and information about consumer group members. |
topics | Information about the topics in the cluster | A dictionary where each key corresponds to a topic. Each value is a table containing information about the partitions in the topic. |
Parameters
- handle|conf: It can be the handle of an existing producer or consumer, a dictionary used to establish a connection, or the IP address and port number of a Kafka cluster.
Examples
Get the metadata of the Kafka cluster with the IP address and port number "localhost:9092".
kafka::getMetadata("localhost:9092")
Get the metadata of the Kafka cluster corresponding to the specified consumer handle.
kafka::getMetadata(consumer)
Output:
brokers->
id host port
-- -------------- ----
2 192.168.100.45 9092
1 192.168.100.44 9092
0 192.168.100.43 9092
consumerGroup->
test->
state->Stable
protocolType->consumer
protocol->range
error->Success
members->
rdkafka-0b501728-5344-4f2f-a450-bc056f6e3400->
clientID->rdkafka
clientHost->/192.168.0.38
memberAssignmentVersion->0
partitions->
topic partition offset
----- --------- ------
test 0 -1001
test 1 -1001
test 2 -1001
topics->
test->
id error leader replicas
-- ------- ------ --------
0 Success 0 [0]
1 Success 1 [1]
2 Success 2 [2]
Usage Examples
- Create a producer to send data in various formats and create a consumer to
consume the data. Ensure that you have created a Kafka topic named
"msgTopic".
// Create a producer producerCfg = dict(string, any); producerCfg["bootstrap.servers"] = "localhost" producer = kafka::producer(producerCfg) // Create a consumer consumerCfg = dict(STRING, ANY) consumerCfg["group.id"] = string(now()) consumerCfg["metadata.broker.list"] = "localhost:9092"; consumer = kafka::consumer(consumerCfg) kafka::subscribe(consumer, "msgTopic");
- Send data in PLAIN
format
// Convert a table to JSON string and send it in PLAIN format str = toStdJson(table([1,2,3] as c1, `a`b`c as c2)) kafka::produce(producer, "msgTopic", "key", str, "PLAIN", 0) // Output in Kafka: [{"c1": 1,"c2": "a"},{"c1": 2,"c2": "b"},{"c1": 3,"c2": "c"}] // Parse the subscribed data in PLAIN format for consumption kafka::consumerPoll(consumer,,"PLAIN") // output: (,("msgTopic","key","[{\"c1\": 1,\"c2\": \"a\"},{\"c1\": 2,\"c2\": \"b\"},{\"c1\": 3,\"c2\": \"c\"}]",0,2024.09.02T03:18:02.446))
- Send data in JSON
format
// Send a scalar kafka::produce(producer, "msgTopic", "key", 1, "JSON", 0) // Output in Kafka:[1] // Note that the result is output in a list, even a scalar. // Send a vector kafka::produce(producer, "msgTopic", "key", [1,2,3], "JSON", 0) // Output in Kafka:[1,2,3] // Send a table kafka::produce(producer, "msgTopic", "key", table([1,2,3] as c1, `a`b`c as c2), "JSON", 0) // Output in Kafka:{"c1":[1,2,3],"c2":["a","b","c"]} // Note that the plugin sends data in columnar format // Parse the subscribed data in PLAIN format for batch consumption kafka::consumerPollBatch(consumer, 3,100,"JSON")
/* Output an ANY vector where each element is also an ANY vector ((,("msgTopic","key",(1),0,2024.09.02T03:21:41.925)), (,("msgTopic","key",(1,2,3),0,2024.09.02T03:21:42.376)), (,("msgTopic","key",c1->(1,2,3) c2->("a","b","c"),0,2024.09.02T03:21:42.836))) */
- Send data in DOLPHINDB
format
// Send a table kafka::produce(producer, "msgTopic", "key", table([1,2,3] as c1, `a`b`c as c2), "DOLPHINDB", 0) // Output in Kafka: c1c2abc // Note that the data in DolphinDB serialization format may contain binary characters // Parse the subscribed data in DOLPHINDB format for batch consumption kafka::consumerPoll(consumer,100,"DOLPHINDB") /* output: (,("msgTopic","key", c1 c2 -- -- 1 a 2 b 3 c ,0,2024.09.02T03:31:09.886)) */
- Send data in PLAIN
format
- Use
createSubJob
,cancelSubJob
, andgetJobStat
to write the subscribed data in JSON format into a stream table in the background.- Use
parseJsonTable
to parse non-recursive JSON data. Ensure that you have created a Kafka topic named "msgTopic1".consumerCfg = dict(STRING, ANY) consumerCfg["group.id"] = "subjob1" consumerCfg["metadata.broker.list"] = "localhost"; consumer1 = kafka::consumer(consumerCfg) kafka::subscribe(consumer1, "msgTopic1"); // Create a shared stream table for the output share streamTable(1:0, `date`id`msg, [DATE,INT,STRING]) as st1 def parser1(msg) { // Set msgAsTable to true so that the subscribed data is ingested into parser as a table // Pass the payload column to parseJsonTable with the specified schema to get the parsed JSON data return parseJsonTable(msg.payload, table(`date`id`msg as name, `DATE`INT`STRING as type)) } // Set msgAsTable to true, throttle to 0.1, and batchSize to 1000 to create subjob1 in the background kafka::createSubJob(consumer1, st1, parser1, "subJob1", 0.1, false, true, 1000) // Send the JSON data producerCfg = dict(string, any); producerCfg["bootstrap.servers"] = "localhost" producer = kafka::producer(producerCfg) /* Expected JSON formatted message: {"msg":"kafka", "id":1, "date":"2024.08.30"} */ kafka::produce(producer, "msgTopic1", "key", {date:2024.08.30, id:1, msg:"kafka"}, "JSON") kafka::produce(producer, "msgTopic1", "key", {date:2024.08.30, id:2, msg:"zookeeper"}, "JSON") kafka::produce(producer, "msgTopic1", "key", {date:2024.08.30, id:3, msg:"raft"}, "JSON") // View the content of the stream table select * from st1
Output:
date id msg 2024.08.30 1 kafka 2024.08.30 2 zookeeper 2024.08.30 3 raft - Use
eval
andparseExpr
to parse the recursive JSON data. Ensure that you have created a Kafka topic named "msgTopic2".consumerCfg["group.id"] = "subjob2" consumer2 = kafka::consumer(consumerCfg) kafka::subscribe(consumer2, "msgTopic2"); share streamTable(1:0, `topic`date`id`msg, [STRING,DATE,INT,STRING]) as st2 def parser2(payload, key, topic) { // Set msgAsTable to false so that the parser is triggered for each message ret = payload.parseExpr().eval() idVec = []$INT msgVec = []$STRING for (record in ret.record) { idVec.append!(record.id) msgVec.append!(record.msg) } count = ret.record.size() return table(take(topic, count) as topic, take(date(ret.date), count) as date, idVec as id, msgVec as msg) } kafka::createSubJob(consumer2, st2, parser2, "subjob2") /* Send the nested JSON data: {"record":[{"msg":"buy","id":1}, {"msg":"sell","id":2}, {"msg":"sell","id":3}, {"msg":"withdraw","id":4}], "date":"2024.09.01"} */ kafka::produce(producer, "msgTopic2", "key", {date:2024.08.30, record:[{id:1, msg:"buy"}, {id:2, msg:"withdraw"}]}, "JSON") kafka::produce(producer, "msgTopic2", "key", {date:2024.08.31, record:[{id:1, msg:"sell"}]}, "JSON") kafka::produce(producer, "msgTopic2", "key", {date:2024.09.01, record:[{id:1, msg:"buy"}, {id:2, msg:"sell"}, {id:3, msg:"sell"}, {id:4, msg:"withdraw"}]}, "JSON") // View the content of the stream table select * from st2
Output:
topic date id msg msgTopic2 2024.08.30 1 buy msgTopic2 2024.08.30 2 withdraw msgTopic2 2024.08.31 1 sell msgTopic2 2024.09.01 1 buy msgTopic2 2024.09.01 2 sell msgTopic2 2024.09.01 3 sell msgTopic2 2024.09.01 4 withdraw - Get the statuses of the subscription jobs in the
background.
kafka::getJobStat()
subscriptionId user actionName createTimestamp processedMsgCount failedMsgCount lastErrMsg lastFailedTimestamp msgAsTable batchSize throttle autoCommit 83095808 admin subJob1 2024.09.02 14:52:01.028 3 0 true 1,000 0.10000000149011612 false 86054464 admin subjob2 2024.09.02 14:52:13.382 3 0 false 0 1 false - Cancel all subscription jobs in the background through action
names.
stat = kafka::getJobStat() for (action in stat.actionName) { kafka::cancelSubJob(action) }
- Use
- Connect to Kafka with SASL authentication
Producers and consumers require the same additional configuration parameters for connection. For details on configuration parameters, refer to the section producer.
- Password
authentication
consumerCfg = dict(string, any); consumerCfg["metadata.broker.list"] = "localhost"; consumerCfg["group.id"] = "test"; consumerCfg["sasl.mechanisms"] = "PLAIN"; consumerCfg["security.protocol"] = "sasl_plaintext"; consumerCfg["sasl.username"] = "username"; consumerCfg["sasl.password"] = "password"; consumer = kafka::consumer(consumerCfg); topics=["test"]; kafka::subscribe(consumer, topics); kafka::consumerPoll(consumer);
- Kerberos authentication
Kerberos authentication requires a Kerberos client installed on DolphinDB server and a validated keytab file. Configuring the Kerberos client can be challenging, so it is recommended to use a Kafka cluster without Kerberos authentication.
producerCfg=dict(STRING, ANY) producerCfg["metadata.broker.list"] = "aftersale2:9094" producerCfg["sasl.mechanisms"] = "GSSAPI" producerCfg["security.protocol"] = "sasl_plaintext"; producerCfg["sasl.kerberos.service.name"] = "kafkaAdmin"; producerCfg["sasl.kerberos.principal"] = "kafkaclient@EXAMPLE.COM"; producerCfg["sasl.kerberos.keytab"] = "/path_to_kerberos/kerberos/kafkaclient.keytab"; producer = kafka::producer(producerCfg)
- Password
authentication
- Connect to Kafka with SSL authentication
Connect to a Kafka broker with SSL authentication. To generate the SSL key and certificate for Kafka brokers, see Using SSL with librdkafka.
ca_path = "/home/slshen/opt/kafka_ssl_rdkafka/ca" producerCfg = dict(string, any); producerCfg["bootstrap.servers"] = "host:port" producerCfg["security.protocol"] = "ssl"; producerCfg["ssl.ca.location"] = ca_path + "/rdkafka/ca-cert" producerCfg["ssl.certificate.location"] = ca_path + "/rdkafka/client_c1_client.pem" producerCfg["ssl.key.location"] = ca_path + "/rdkafka/client_c1_client.key" producerCfg["ssl.key.password"] = "key.password" producer = kafka::producer(producerCfg)
FAQ
Q1: How to troubleshoot thread parsing issues in the background?
Use the getJobStat
method to check for parsing failures and monitor
the number of messages that have been or failed to be processed.
Q2: Why are there illegal characters at the beginning of the data sent by the plugin?
If the marshalType parameter of produce
is set to
"DOLPHINDB", messages will be sent in binary format. If you directly view these
messages as strings, they may appear as garbled text.
Q3: Where are the error logs of Kafka?
Check the DolphinDB server log for entries starting with
[PLUGIN::KAFKA]
to find error messages recorded in the
background.
Q4: What is the difference between assign
and
subscribe
?
With assign
, consumers can consume the messages starting from a
given offset in the specified topics and partitions.
With subscribe
, consumers can subscribe to a Kafka topic. Messages
subscribed by different consumers will not be duplicated since data from the same
partition can only be consumed by one consumer in the group,
Q5: How to manually control message commits?
Set the consumer configuration parameter enable.auto.commit to false to avoid
automatic commits. Then, call commit
to manually commit
messages.
Q6: What causes the error Local: Queue full
when calling
produce
?
This error occurs when the sending speed is too fast. You can modify the parameters queue.buffering.max.messages and queue.buffering.max.kbytes based on the actual situation. For detailed usage, refer to the section producer.
Q7: Why the first message received in the background is Application
maximum poll interval (3000000 ms) exceeded by ...
?
The interval between two polls is too long, possibly due to a delay in performing a poll operation after establishing the consumer.
Q8: How to troubleshoot connection failures to Kafka with Kerberos authentication?
In this case, troubleshooting primarily relies on log information. The authentication
is done in the background. Only generic errors are displayed in front-end, such as
broker transport failure
. Configuring the Kerberos client can
be challenging, so it is recommended for experienced users.
Common causes of connection failures and the log examples:
-
If “kafka” is not added to /etc/hosts on the client host, the client is unable to resolve and reach the advertised hostnames.
2024-12-09 06:54:02.445669000,2fea <ERROR> :[PLUGIN::KAFKA] facility: FAIL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: Failed to resolve 'aftersale2:9094': Name or service not known (after 12ms in state CONNECT)
-
The necessary Kerberos libraries are not installed in the environment. Front-end error:
sh: kinit: command not found
.:[PLUGIN::KAFKA] facility: SASLREFRESH, message: [thrd:main]: Kerberos ticket refresh failed: kinit -R -t "/path_to_kerberos/kerberos/kafkaclient.keytab" -k kafkaclient@EXAMPLE.COM || kinit -t "/path_to_kerberos/kerberos/kafkaclient.keytab" -k kafkaclient@EXAMPLE.COM: exited with code 127 2024-12-09 07:02:08.319563000,cea <ERROR> :[PLUGIN::KAFKA] facility: LIBSASL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: Cyrus/libsasl2 is missing a GSSAPI module: make sure the libsasl2-modules-gssapi-mit or cyrus-sasl-gssapi packages are installed 2024-12-09 07:02:08.319742000,cea <ERROR> :[PLUGIN::KAFKA] facility: FAIL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): no mechanism available: No worthy mechs found (after 0ms in state AUTH_REQ)
-
Incorrect configuration of the krb5.conf file. Front-end error:
kinit: Invalid UID in persistent keyring name while getting default ccache
.2024-12-09 07:24:26.230664000,79b6 <ERROR> :[PLUGIN::KAFKA] facility: LIBSASL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: GSSAPI Error: Unspecified GSS failure. Minor code may provide more information (No Kerberos credentials available: Invalid UID in persistent keyring name) 2024-12-09 07:24:26.230852000,79b6 <ERROR> :[PLUGIN::KAFKA] facility: FAIL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-1)): SASL(-1): generic failure: GSSAPI Error: Unspecified GSS failure. Minor code may provide more information (No Kerberos credentials available: Invalid UID in persistent keyring name) (after 1ms in state AUTH_REQ)
The Kerberos principal is not configured on the Kafka server.
Use the2024-12-09 08:07:05.294237000,24e8 <ERROR> :[PLUGIN::KAFKA] facility: LIBSASL, message: [thrd:sasl_plaintext://aftersale2:9094/bootstrap]: sasl_plaintext://aftersale2:9094/bootstrap: GSSAPI Error: Unspecified GSS failure. Minor code may provide more information (Server kafkaClient/aftersale2@EXAMPLE.COM not found in Kerberos database)
kadmin list_principals
command to generate a listing of principals. Check whetherkafkaClient/aftersale2@EXAMPLE.COM
exists or not.