redis
Through DolphinDB's redis plugin, users can establish a connection to a redis server at a specified IP and port, and perform data operations. The redis plugin uses the Hiredis library, a lightweight Redis C client library.
Installation (with installPlugin)
Required server version: DolphinDB 2.00.10 or higher.
Supported OS: windows x64 and Linux x64.
Installation Steps:
(1) Use listRemotePlugins to check plugin information in the plugin repository.
Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.
login("admin", "123456")
listRemotePlugins(, "http://plugins.dolphindb.com/plugins/")(2) Invoke installPlugin for plugin installation.
installPlugin("redis")(3) Use loadPlugin to load the plugin before using the plugin methods.
loadPlugin("redis")Method References
connect
Syntax
connect(host, port)Details
The method establishes a connection to the redis server and returns a handle for the redis connection.
Parameters
- host: A STRING scalar indicating the IP address of the redis server.
- port: An INT scalar indicating the port number of the redis server.
Examples
Assuming the redis server is listening on 127.0.0.1:6379.
conn = redis::connect("127.0.0.1",6379)run
Syntax
run(conn, arg1, arg2, arg3, ...)Details
The method executes redis commands. If the redis is password-protected, use run(conn, "AUTH", "password") to gain access. The results of the redis commands will be returned in DolphinDB type with automatic conversion.
Note: If an error occurs during execution, the connection cannot be used anymore. Release this connection and reconnect.
Parameters
- conn: The redis connection handle obtained through
connect. - arg1: A STRING scalar indicating redis commands such as SET, GET, etc.
- arg2, arg3, ...: Parameters required by the redis command.
Examples
In the following example, the run method is first used to execute the SET command, setting “abc“ to hold the string “vabc“. Then, the GET command is executed to get the value of “abc“.
conn = redis::connect("127.0.0.1",6379)
redis::run(conn, "SET", "abc", "vabc")
val = redis::run(conn, "GET", "abc")
val == "vabc"batchSet
Syntax
batchSet(conn, keys, values)Details
The method sets the given keys to their respective values. It can be used with the subscribeTable function to facilitate the bulk data insertion into a redis database.
Note: If an error occurs during execution, the connection cannot be used anymore. Release this connection and reconnect.
Parameters
- conn: The Redis connection handle obtained through
connect. - keys: A STRING scalar or vector indicating the keys to set.
- values: A STRING scalar or vector indicating the values to set.
Examples
conn = redis::connect("127.0.0.1",6379)
redis::batchSet(conn, "k1", "v1")
keys = ["k2", "k3", "k4"]
values = ["v2", "v3", "v4"]
redis::batchSet(conn, keys, values)batchHashSet
Syntax
batchHashSet(conn, ids, fieldData)Details
The method sets the specified fields to their respective values into redis hashes. It is used to batch write data to the redis database.
Note: If an error occurs during execution, the connection cannot be used anymore. Release this connection and reconnect.
Parameters
- conn: Redis connection handle obtained through
connect. - ids: A STRING array, where each element serves as a key of the Redis hash. These keys correspond to specific rows of data in the fieldData table.
- fieldData: A table where each column is of STRING type. Each column name corresponds to a field of the hash, and the value serves as the value of the Redis hash.
Examples
loadPlugin("path/PluginRedis.txt");
go
// Generate data
n=43200
instrument_id = take("HO2305-D-",n)+string(1..n)
time_stamp = timestamp(2024.02.02T09:30:01..2024.02.02T21:30:00)
exchange = take("CFFEX",n)
last_price = rand(10.0,n)
volume = rand(100000,n)
bid_price1 = rand(10.0,n)
bid_volume1 = rand(1000,n)
bid_price2 = rand(10.0,n)
bid_volume2 = rand(1000,n)
bid_price3 = rand(10.0,n)
bid_volume3 = rand(1000,n)
bid_price4 = rand(10.0,n)
bid_volume4 = rand(1000,n)
bid_price5 = rand(10.0,n)
bid_volume5 = rand(1000,n)
t = table(instrument_id, time_stamp,exchange, last_price,volume, bid_price1,bid_volume1, bid_price2,bid_volume2,bid_price3,bid_volume3,bid_price4,bid_volume4,bid_price5,bid_volume5)
conn=redis::connect("127.0.0.1",6379)
// Batch set operation
ids = exec instrument_id from t
fieldData = select string(time_stamp) as time_stamp,string(exchange) as exchange,string(last_price) as last_price,string(volume) as volume,string(bid_price1) as bid_price1,string(bid_volume1) as bid_volume1,string(bid_price2) as bid_price2,string(bid_volume2) as bid_volume2,string(bid_price3) as bid_price3,string(bid_volume3) as bid_volume3,string(bid_price4) as bid_price4,string(bid_volume4) as bid_volume4,string(bid_price5) as bid_price5,string(bid_volume5) as bid_volume5 from t
redis::batchHashSet(conn, ids, fieldData)batchPush
Syntax
batchPush(conn, keys, values, [pushRight=true])Details
A Redis batch push method that allows simultaneous insertion of multiple values into specified keys.
Note: While the function does not guarantee successful execution for all keys in the batch, it ensures atomic operations for individual keys—values for a specific key are either entirely added or completely skipped.
Parameters
- conn: The Redis connection handle obtained through the connect method.
- keys: A STRING vector representing the target keys.
- values: A tuple where each element is a vector of strings representing the values to be added.
- pushRight (optional): A boolean scalar. When set to true (default), the method performs RPUSH; when set to false, it performs LPUSH.
Examples
conn = redis::connect("127.0.0.1",6379)
keys = ["k1", "k2", "k3"]
values = [["v1, v2"], ["v3", "v4", "v5"], ["v6"]]
redis::batchPush(conn, keys, values)batchGet
Syntax
redis::batchGet(conn, key)Details
The method queries the specified keys in batches and returns a STRING vector.
Parameters
- conn: The Redis connection handle obtained through the
redis::connectmethod. - key: A STRING vector indicating the target keys to query.
Examples
conn = redis::connect("127.0.0.1",6379)
key = ["k2", "k3", "k4"]
redis::batchGet(conn, key)release
Syntax
release(conn)Details
The method closes the specified connection to the redis server.
Parameters
- conn: Redis connection handle obtained through
connect.
Examples
conn = redis::connect("127.0.0.1",6379)
redis::release(conn)releaseAll
Syntax
releaseAll()Details
The method closes all connections to the redis server.
Examples
conn = redis::releaseAll()getHandle
Syntax
getHandle(token)Details
The method returns a specific redis handle.
Parameters
- token: A unique identifier for a Redis connection, obtained from the first column of the table returned by
getHandleStatus.
Examples
handle = redis::getHandle(token)getHandleStatus
Syntax
getHandleStatus()Details
The method is used to get information about all established connections. It returns a table with the following columns:
- token: The unique identifier for the connection, used to obtain the handle through
getHandle(token). - address: The "ip:port" network of the connected Redis server.
- createdTime: The time when the connection is created.
Examples
status = redis::getHandleStatus()Stream-related methods
createStreamGroup
Syntax
redis::createStreamGroup(conn, key, group, [msgId="$"], [mkStream=false])Details
Creates a Redis consumer group with the specified configuration for consuming streaming data.
Parameters
conn: A Redis connection handle obtained via redis::connect.
key: A STRING scalar specifying the name of the stream to consume.
group: A STRING scalar specifying the consumer group name.
msgId (optional): A STRING scalar specifying the ID of the message from which the group starts consuming. The default value is "$" (consume from the latest message). "0" means consuming from the beginning of the stream.
mkStream (optional): A boolean value specifying whether to create the stream if it does not exist. If mkStream=true and the stream does not exist, a new stream is created automatically using the key. The default value is false.
Returns
None.
readStreamGroup
Syntax
redis::readStreamGroup(conn, key, group, consumer, [count=1], [readPending=">"], [maxWaitTime=0], [writeToPending=false])Details
Synchronously consumes stream data through a Redis consumer group.
Parameters
conn: A Redis connection handle obtained via redis::connect.
key: STRING scalar specifying the name of the stream to consume.
group: A STRING scalar specifying the consumer group name.
consumer: A STRING scalar specifying the consumer name. If it does not exist, it is created automatically.
count (optional): An INT scalar specifying the maximum number of messages to read. The default value is 1.
readPending (optional): A STRING scalar specifying whether to read pending messages. Supported values are ">" and "0". The default value is ">".
- ">": Read only new messages that have never been delivered to any consumer.
- "0": Read only pending messages that were previously read by the current consumer but not acknowledged. In this mode, maxWaitTime and writeToPending cannot be set, since it is only used to re-fetch unprocessed messages of the current consumer.
maxWaitTime (optional): An INT scalar specifying the maximum time (in milliseconds) to wait for new messages. The default value is 0, meaning that the method returns immediately if no new messages are available. If greater than 0, the system waits up to the specified time and returns as soon as new messages arrive, or returns an empty result on timeout.
writeToPending (optional): A boolean value specifying whether to put messages into the pending list. The default value is false, meaning messages are auto-acknowledged on read. If set to true, messages must be acknowledged manually.
Returns
A dictionary where:
- the key is a STRING representing the message ID;
- the value is a dictionary containing message fields and values (both STRING).
If no messages are available, an empty dictionary is returned.
Example:
{
"1702200000000-0": {"field1": "value1", "field2": "value2"},
"1702200000001-0": {"field1": "value3", "field2": "value4"}
}subscribeStream
Syntax
redis::subscribeStream(host, port, key, group, consumer, outputTable, parser, actionName, [readPending=">"], [batchSize=100], [autoAck=true])Details
Creates a background subscription task that continuously consumes messages from a Redis stream and processes them using the parser callback function.
Parameters
host: A STRING scalar specifying the host address of the Redis server.
port: An INT scalar specifying the port number of the Redis server.
key: A STRING scalar specifying the name of the stream to consume.
group: A STRING scalar specifying the consumer group name. The group must be created in advance using createStreamGroup.
consumer: A STRING scalar specifying the consumer name.
outputTable: A table used to store the results returned by the parser function.
parser: A callback function that takes one parameter—the message dictionary received from the stream. The function parses the message and must return a table, which is then appended to outputTable.
actionName: A STRING scalar specifying the name of the subscription task, used for identification and management.
readPending (optional): A STRING scalar specifying whether to prioritize reading pending messages after the subscription starts. Supported values are ">" and "0". The default value is ">".
- ">": Read only new messages that have never been delivered to any consumer.
- "0": Read all pending messages of the current consumer. Note: all pending messages are read at once and are not limited by batchSize.
batchSize (optional): An INT scalar specifying the number of messages to read from Redis per batch. The default value is 100.
autoAck (optional): A boolean value specifying whether to automatically acknowledge messages after parser finishes processing. The default value is true.
Returns
None.
getSubscriptionStats
Syntax
redis::getSubscriptionStats()Details
Retrieves information about background subscription tasks.
Parameters
None.
Returns
A table with the following columns:
| Column name | Type | Description |
|---|---|---|
| actionName | STRING | Subscription task name |
| conn | STRING | Redis connection address |
| stream | STRING | Stream name |
| group | STRING | Consumer group name |
| consumer | STRING | Consumer name |
| createTime | TIMESTAMP | Task creation time |
| processedMsgCount | LONG | Number of successfully processed messages |
| status | STRING | Task status (“OK” for normal, “FATAL” for stopped due to error) |
| lastErrMsg | STRING | Last error message |
| lastFailedTime | TIMESTAMP | Time of the last error |
| autoAck | BOOL | Whether messages are auto-acknowledged |
unsubscribeStream
Syntax
redis::unsubscribeStream(actionName)Details
Cancels the specified subscription task. After cancellation, the background thread stops consuming messages.
Parameters
actionName: A STRING scalar specifying the subscription task name.
Returns
None.
ackMessages
Syntax
redis::ackMessages(conn, key, group, msgIds)Details
Acknowledges messages as processed and removes them from the pending list of the consumer group.
Parameters
conn: A Redis connection handle obtained via redis::connect.
key: A STRING scalar specifying the name of the stream.
group: A STRING scalar specifying the consumer group name.
msgIds: A STRING vector specifying the list of message IDs to acknowledge.
Returns
An INT scalar indicating the number of messages successfully acknowledged.
Usage Examples
Example 1: write data subscribed by subscribeTable to the Redis database
loadPlugin("redis");
go
dropStreamTable(`table1)
colName=["key", "value"]
colType=["string", "string"]
enableTableShareAndPersistence(table=streamTable(100:0, colName, colType), tableName=`table1, cacheSize=10000, asynWrite=false)
def myHandle(conn, msg) {
redis::batchSet(conn, msg[0], msg[1])
}
conn = redis::connect("replace_with_redis_server_ip",6379)
subscribeTable(tableName="table1", handler=myHandle{conn})
n = 1000000
for(x in 0:n){
insert into table1 values("key" + x, "value" + x)
}
t = table(n:0, [`id, `val], [`string, `string])
for(x in 0:n){
insert into t values("key" + x, redis::run(conn, "GET", "key" + x))
}
ret = exec count(*) from t
assert "test", n==retExample 2: consume messages in sync mode
Load the redis plugin and configure basic information for consumption.
loadPlugin("redis")
host = "127.0.0.1"
port = 6379
streamKey = "test_stream"
groupName = "test_group"
consumerName = "consumer1"Create a Redis connection and consumer group.
conn = redis::connect(host, port)
// Delete the stream if it exists
redis::run(conn, "DEL", streamKey)
/*
msgId="0": Consume from the beginning.
mkStream=true: Create the stream if it does not exist.
*/
redis::createStreamGroup(conn, streamKey, groupName, "0", true)Insert three messages into the Redis stream.
msgId1 = redis::run(conn, "XADD", streamKey, "*", "time", string(now()), "msg", string("XADD_MSG" + rand(10000000000)[0]))
msgId2 = redis::run(conn, "XADD", streamKey, "*", "time", string(now()), "msg", string("XADD_MSG" + rand(10000000000)[0]))
msgId3 = redis::run(conn, "XADD", streamKey, "*", "time", string(now()), "msg", string("XADD_MSG" + rand(10000000000)[0]))
// View message count in the stream
redis::run(conn, "XLEN", streamKey)Read and acknowledge messages.
// Read new messages (readPending=false)
/*
count = 10: Read up to 10 messages each time.
readPending = false: Do not read pending messages.
maxWaitTime = 0: Do not wait for new messages.
writeToPending = true: Do not acknowledge messages automatically.
*/
redis::readStreamGroup(conn, streamKey, groupName, consumerName, 10, ">", 0, true)
// Read pending messages (readPending=true)
msgs = redis::readStreamGroup(conn, streamKey, groupName, consumerName, 10, "0")
// Acknowledge messages
redis::ackMessages(conn, streamKey, groupName, keys(msgs))
// Read pending messages again (the result should be empty)
redis::readStreamGroup(conn, streamKey, groupName, consumerName, 10, "0")Example 3: consume messages in async mode
Load the redis plugin and insert data into the Redis stream.
loadPlugin("redis")
// Insert data into the stream in a separate session
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
STREAM_KEY = "test_stream_sub"
GROUP_NAME = "test_group"
CONSUMER_NAME = "consumer1"
ACTION_NAME = "test_action"
conn = redis::connect(REDIS_HOST, REDIS_PORT)
for (i in 1..100000) {
redis::run(conn, "XADD", STREAM_KEY, "*", "field1", "value_" + string(i), "field2", string(i * 100))
sleep(10)
}Configure the subscription information.
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
STREAM_KEY = "test_stream_sub"
GROUP_NAME = "test_group"
CONSUMER_NAME = "consumer1"
ACTION_NAME = "test_action"
conn = redis::connect(REDIS_HOST, REDIS_PORT)
// Clear test data (optional)
// redis::run(conn, "DEL", STREAM_KEY)
// redis::run(conn, "XGROUP", "DESTROY", STREAM_KEY, GROUP_NAME)Create the consumer group, and define the output table and parser function.
// Create the consumer group
redis::createStreamGroup(conn, STREAM_KEY, GROUP_NAME, "0", true)
// Define the output table
share table(100:0, `msgId`field1`field2`timestamp, [STRING, STRING, STRING, TIMESTAMP]) as outputTable
// Define the parser function for converting the dictionary into a table
def msgParser(msgs) {
msgIds = msgs.keys()
n = msgIds.size()
if (n == 0) {
return table(0:0, `msgId`field1`field2`timestamp, [STRING, STRING, STRING, TIMESTAMP])
}
result = table(n:0, `msgId`field1`field2`timestamp, [STRING, STRING, STRING, TIMESTAMP])
for (msgId in msgIds) {
fields = msgs[msgId]
field1 = fields["field1"]
field2 = fields["field2"]
insert into result values(msgId, field1, field2, now())
}
return result
}Subscribe to the stream, view subscription results, unsubscribe to the stream, and clear the environment.
// Subscribe to the stream
redis::subscribeStream(REDIS_HOST, REDIS_PORT, STREAM_KEY, GROUP_NAME, CONSUMER_NAME,
outputTable, msgParser, ACTION_NAME, ">", 100, true)
// View subscription results
redis::getSubscriptionStats()
select * from outputTable
// Unsubscribe to the stream
redis::unsubscribeStream(ACTION_NAME)
redis::getSubscriptionStats()
// Clear the environment
redis::release(conn)
undef(`outputTable, SHARED);