class Redis::Cluster

Overview

Use in place of a Redis::Client when talking to Redis clusters. This class will discover all nodes in a Redis cluster when given a URI for any of them, route commands to appropriate shards based on the keys they operate on, and route commands which do not change state to shard replicas to spread the load across the cluster.

It's important that, when using commands which operate on multiple keys (for example: MGET, DEL, RPOPLPUSH, etc) that all specified keys reside on the same shard in the cluster. Usually, this means designing your key names with curly braces around parts of them to ensure they hash to the same key slot. For example:

redis.del "{comment}:1", "{comment}:2"
value = redis.rpoplpush "{queue}:default", "{queue}:default:pending"

If you want to use a Redis module that provides custom commands, you can register them as read-only with Redis::Cluster.register_read_only_commands and they will automatically be routed to replicas. See redis/cluster/json.cr for example usage.

EXPERIMENTAL Cluster support is still under development. Some APIs may change while details are discovered and highly customized clusters (for example, servers handling individual hash slots or multiple hash-slot ranges) are not yet supported.

Included Modules

Defined in:

cluster.cr
errors.cr

Constructors

Class Method Summary

Instance Method Summary

Instance methods inherited from module Redis::Commands

decr(key : String) decr, decrby(key : String, amount : Int | String) decrby, del(*keys : String) del, exists(*keys : String) exists, expire(key : String, ttl : Int) expire, expireat(key : String, at : Time) expireat, flushdb flushdb, ft ft, get(key : String) get, incr(key : String) incr, incrby(key : String, amount : Int | String) incrby, info info, json json, keys(pattern = "*") keys, mget(keys : Enumerable(String)) mget, mset(data : Hash(String, String)) mset, pexpire(key : String, ttl : Int) pexpire, pexpireat(key : String, at : Time) pexpireat, pttl(key : String) pttl, publish(channel : String, message : String) publish, run(command) run, scan(cursor : String = "0", match : String | Nil = nil, count : String | Int | Nil = nil, type : String | Nil = nil) scan, set(key : String, value : String, ex : String | Int | Nil = nil, px : String | Int | Nil = nil, nx = false, xx = false, keepttl = false)
set(key, value, ex : Time, nx = false, xx = false, keepttl = false)
set(key, value, ex : Time::Span, nx = false, xx = false, keepttl = false)
set
, ts ts, ttl(key : String) ttl, type(key : String) type, unlink(keys : Enumerable(String))
unlink(*keys : String)
unlink

Instance methods inherited from module Redis::Commands::Stream

xack(key : String, group : String, id : String)
xack(key : String, group : String, ids : Enumerable(String))
xack
, xadd(key : String, id : String, maxlen, data : Hash(String, String))
xadd(key : String, id : String, data : Hash(String, String))
xadd(key : String, id : String, maxlen = nil, **data)
xadd
, xautoclaim(key : String, group : String, consumer : String, min_idle_time : Time::Span, start : String, count : Int | String | Nil = nil) xautoclaim, xdel(key : String, ids : Enumerable(String))
xdel(key : String, *ids : String)
xdel
, xgroup(command : String, key : String, groupname : String)
xgroup(command : XGroup, key : String, groupname : String, *, id : String | Nil = nil, mkstream = false, consumer_name : String | Nil = nil)
xgroup(command : String, key : String, groupname : String, *args : String)
xgroup
, xgroup_create(key : String, groupname : String, *, id : String = "$", mkstream = false) xgroup_create, xgroup_create_consumer(key : String, groupname : String, consumer_name : String) xgroup_create_consumer, xlen(key : String) xlen, xpending(key : String, group : String, start : String, end finish : String, count : String | Int, idle : String | Time::Span | Nil = nil)
xpending(key : String, group : String)
xpending
, xrange(key : String, start min, end max, count = nil) xrange, xreadgroup(group : String, consumer : String, count : String | Int | Nil = nil, block : Time::Span | String | Int | Nil = nil, no_ack = false, streams : ::Hash(String, String) = {} of String => String)
xreadgroup(group : String, consumer : String, count : String | Int | Nil = nil, block : Time::Span | String | Int | Nil = nil, no_ack = false, streams : NamedTuple = NamedTuple.new)
xreadgroup

Instance methods inherited from module Redis::Commands::SortedSet

zadd(key : String, score : String | Float, value : String)
zadd(key : String, values : Enumerable)
zadd
, zcard(key : String) zcard, zrange(key : String, starting : String | Int, ending : String | Int, with_scores : Bool = false) zrange, zrangebyscore(key : String, low : String | Float, high : String | Float, limit : Enumerable(String) | Nil = nil) zrangebyscore, zrem(key : String, value : String) zrem, zremrangebyrank(key : String, low : Int, high : Int) zremrangebyrank, zremrangebyscore(key : String, low : String | Float, high : String | Float) zremrangebyscore, zrevrange(key : String, starting : String | Int, ending : String | Int, with_scores : Bool = false) zrevrange

Instance methods inherited from module Redis::Commands::Set

sadd(key : String, *values : String) sadd, scard(key : String) scard, sdiff(first : String, second : String) sdiff, sinter(first : String, *others : String) sinter, sismember(key : String, value : String) sismember, smembers(key : String) smembers, srem(key : String, members : Enumerable(String))
srem(key : String, *values : String)
srem

Instance methods inherited from module Redis::Commands::List

blpop(*keys : String, timeout : Time::Span)
blpop(*keys : String, timeout : Int | Float)
blpop(*keys : String, timeout : String)
blpop
, brpop(keys : Enumerable(String), timeout : Int)
brpop(*keys : String, timeout : Time::Span)
brpop(*keys : String, timeout : Number)
brpop(*keys : String, timeout : String)
brpop
, llen(key : String) llen, lpop(key : String, count : String | Nil = nil) lpop, lpush(key : String, values : Enumerable(String))
lpush(key, *values : String)
lpush
, lrange(key : String, start : String, finish : String) lrange, lrem(key : String, count : Int, value : String) lrem, rpop(key : String) rpop, rpoplpush(source : String, destination : String) rpoplpush, rpush(key, *values : String) rpush

Instance methods inherited from module Redis::Commands::Hash

hget(key : String, field : String) hget, hgetall(key : String) hgetall, hmget(key : String, *fields : String) hmget, hmset(key : String, data : ::Hash(String, String)) hmset, hset(key : String, data : ::Hash(String, String))
hset(key : String, *fields : String)
hset(key : String, **fields : String)
hset

Constructor Detail

def self.new(uri : URI = URI.parse(ENV["REDIS_CLUSTER_URL"]? || "redis:///")) #

Pass a URI (defaulting to the REDIS_CLUSTER_URL environment variable) to connect to the specified Redis cluster — the URI can point to any server in the cluster and Redis::Cluster will discover the rest.


[View source]

Class Method Detail

def self.register_read_only_command(command : String) #

Tell the cluster driver that the specified Redis command can be routed to read-only replicas.

Redis::Cluster.register_read_only_command "mymodule.get"

[View source]
def self.register_read_only_commands(commands : Enumerable(String)) #

Tell the cluster driver that all the specified Redis commands can be routed to read-only replicas.

Redis::Cluster.register_read_only_commands %w[
  mymodule.get
  mymodule.mget
]

[View source]

Instance Method Detail

def close #

Close all connections to this Redis cluster


[View source]
def flushdb #

Executes #flushdb on each shard in the cluster.


[View source]
def keys : Array(String) #

Get all key across all shards. This executes a #keys command on every shard in the cluster. Probably not a good idea in production since this will block every Redis shard or replica for the duration of the query, but we're supporting it because you may have a reasonable use case for it at some point and it's just not easy to do otherwise.


[View source]
def pipeline(key : String, &) #

Run a pipeline for the specified key

cluster.pipeline "{widgets}" do |pipe|
  widget_ids.each do |id|
    pipe.get "{widgets}:#{id}"
  end
end

WARNING All keys that this pipeline operates on MUST reside on the same shard. It's best to pass a pre-hashed key (one containing {}) to this method. See the example above.


[View source]
def run(command full_command) #
Description copied from module Redis::Commands

Execute the given command and return the result from the server. Commands must be an Enumerable and its size method must be re-entrant.

run({"set", "foo", "bar"})

[View source]
def scan_each(*args, **kwargs, &) : Nil #

Execute Commands#scan_each on each shard, yielding any matching keys.


[View source]
def slot_for(key : String) #

Return the Redis hash slot for the given key. This is useful for seeing which shard your command will be routed to.


[View source]