class Redis::Cluster

Overview

Use in place of a Redis::Client when talking to Redis clusters. This class will discover all nodes in a Redis cluster when given a URI for any of them, route commands to appropriate shards based on the keys they operate on, and route commands which do not change state to shard replicas to spread the load across the cluster.

It's important that, when using commands which operate on multiple keys (for example: MGET, DEL, RPOPLPUSH, etc) that all specified keys reside on the same shard in the cluster. Usually, this means designing your key names with curly braces around parts of them to ensure they hash to the same key slot. For example:

redis.del "{comment}:1", "{comment}:2"
value = redis.rpoplpush "{queue}:default", "{queue}:default:pending"

If you want to use a Redis module that provides custom commands, you can register them as read-only with Redis::Cluster.register_read_only_commands and they will automatically be routed to replicas. See redis/cluster/json.cr for example usage.

EXPERIMENTAL Cluster support is still under development. Some APIs may change while details are discovered and highly customized clusters (for example, servers handling individual hash slots or multiple hash-slot ranges) are not yet supported.

Included Modules

Defined in:

cluster.cr
errors.cr

Constructors

Class Method Summary

Instance Method Summary

Instance methods inherited from module Redis::Commands

decr(key : String) decr, decrby(key : String, amount : Int | String) decrby, del(keys : Enumerable(String))
del(*keys : String)
del
, eval(script : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) eval, eval_ro(script : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) eval_ro, evalsha(sha : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) evalsha, evalsha_ro(script : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) evalsha_ro, exists(*keys : String) exists, expire(key : String, ttl : Time::Span)
expire(key : String, ttl : Int)
expire
, expireat(key : String, at : Time) expireat, flushall flushall, flushdb flushdb, ft ft, get(key : String) get, graph(key : String) graph, incr(key : String) incr, incrby(key : String, amount : Int | String) incrby, info info, json json, keys(pattern = "*") keys, mget(keys : Enumerable(String)) mget, mset(data : Hash(String, String)) mset, pexpire(key : String, ttl : Time::Span)
pexpire(key : String, ttl : Int)
pexpire
, pexpireat(key : String, at : Time) pexpireat, pttl(key : String) pttl, publish(channel : String, message : String) publish, run(command) run, scan(cursor : String = "0", match : String | Nil = nil, count : String | Int | Nil = nil, type : String | Nil = nil) scan, script_exists(shas : Enumerable(String))
script_exists(*shas : String)
script_exists
, script_flush(mode : ScriptFlushMode) script_flush, script_kill script_kill, script_load(script : String) script_load, set(key : String, value : String, ex : String | Int | Nil = nil, px : String | Int | Nil = nil, nx = false, xx = false, keepttl = false)
set(key, value, ex : Time, nx = false, xx = false, keepttl = false)
set(key, value, ex : Time::Span, nx = false, xx = false, keepttl = false)
set
, ts ts, ttl(key : String) ttl, type(key : String) type, unlink(keys : Enumerable(String))
unlink(*keys : String)
unlink

Instance methods inherited from module Redis::Commands::Stream

xack(key : String, group : String, id : String)
xack(key : String, group : String, ids : Enumerable(String))
xack
, xadd(key : String, id : String, maxlen, data : Hash(String, String))
xadd(key : String, id : String, data : Hash(String, String))
xadd(key : String, id : String, maxlen = nil, **data)
xadd
, xautoclaim(key : String, group : String, consumer : String, min_idle_time : Time::Span, start : String, count : Int | String | Nil = nil) xautoclaim, xdel(key : String, ids : Enumerable(String))
xdel(key : String, *ids : String)
xdel
, xgroup(command : String, key : String, groupname : String)
xgroup(command : XGroup, key : String, groupname : String, *, id : String | Nil = nil, mkstream = false, consumer_name : String | Nil = nil)
xgroup(command : String, key : String, groupname : String, *args : String)
xgroup
, xgroup_create(key : String, groupname : String, *, id : String = "$", mkstream = false) xgroup_create, xgroup_create_consumer(key : String, groupname : String, consumer_name : String) xgroup_create_consumer, xlen(key : String) xlen, xpending(key : String, group : String, start : String, end finish : String, count : String | Int, idle : String | Time::Span | Nil = nil)
xpending(key : String, group : String)
xpending
, xrange(key : String, start min, end max, count = nil) xrange, xreadgroup(group : String, consumer : String, count : String | Int | Nil = nil, block : Time::Span | String | Int | Nil = nil, no_ack = false, streams : ::Hash(String, String) = {} of String => String)
xreadgroup(group : String, consumer : String, count : String | Int | Nil = nil, block : Time::Span | String | Int | Nil = nil, no_ack = false, streams : NamedTuple = NamedTuple.new)
xreadgroup

Instance methods inherited from module Redis::Commands::SortedSet

zadd(key : String, score : String | Int64, value : String)
zadd(key : String, values : Enumerable(String))
zadd
, zcard(key : String) zcard, zcount(key : String, min : String, max : String) zcount, zrange(key : String, starting : String | Int64, ending : String | Int64, with_scores : Bool = false) zrange, zrangebyscore(key : String, low : String | Int64, high : String | Int64, limit : Enumerable(String) | Nil = nil) zrangebyscore, zrem(key : String, value : String)
zrem(key : String, values : Enumerable(String))
zrem(key : String, *values : String)
zrem
, zremrangebyrank(key : String, low : Int64, high : Int64) zremrangebyrank, zremrangebyscore(key : String, low : String | Int64, high : String | Int64) zremrangebyscore, zrevrange(key : String, starting : String | Int64, ending : String | Int64, with_scores : Bool = false) zrevrange, zscore(key : String, value : String) zscore

Instance methods inherited from module Redis::Commands::Set

sadd(key : String, *values : String) sadd, scard(key : String) scard, sdiff(first : String, second : String) sdiff, sinter(first : String, *others : String) sinter, sismember(key : String, value : String) sismember, smembers(key : String) smembers, srem(key : String, members : Enumerable(String))
srem(key : String, *values : String)
srem

Instance methods inherited from module Redis::Commands::List

blpop(keys : Enumerable(String), timeout : Time::Span)
blpop(*keys : String, timeout : Time::Span)
blpop(*keys : String, timeout : Int | Float)
blpop(*keys : String, timeout : String)
blpop
, brpop(keys : Enumerable(String), timeout : Int)
brpop(*keys : String, timeout : Time::Span)
brpop(*keys : String, timeout : Number)
brpop(*keys : String, timeout : String)
brpop
, llen(key : String) llen, lmove(from source : String, to destination : String, from_side source_side : Side, to_side destination_side : Side) lmove, lpop(key : String, count : String | Nil = nil) lpop, lpush(key : String, values : Enumerable(String))
lpush(key, *values : String)
lpush
, lrange(key : String, start : String | Int, finish : String | Int) lrange, lrem(key : String, count : Int, value : String) lrem, ltrim(key : String, start : String | Int, stop : String | Int)
ltrim(key : String, range : Range(String, String))
ltrim(key : String, range : Range(Int32, Int32))
ltrim
, rpop(key : String) rpop, rpoplpush(source : String, destination : String) rpoplpush, rpush(key : String, values : Enumerable(String))
rpush(key, *values : String)
rpush

Instance methods inherited from module Redis::Commands::Hash

hdel(key : String, fields : Enumerable(String))
hdel(key : String, *fields : String)
hdel
, hget(key : String, field : String) hget, hgetall(key : String) hgetall, hincrby(key : String, field : String, increment : Int | String) hincrby, hmget(key : String, fields : Enumerable(String))
hmget(key : String, *fields : String)
hmget
, hmset(key : String, data : ::Hash(String, String)) hmset, hscan(key : String, cursor : String, *, match pattern : String | Nil = nil, count : String | Int | Nil = nil) hscan, hset(key : String, fields : Enumerable(String))
hset(key : String, fields : ::Hash(String, String))
hset(key : String, *fields : String)
hset(key : String, **fields : String)
hset
, hsetnx(key : String, field : String, value : String) hsetnx

Constructor Detail

def self.new(uri : URI = URI.parse(ENV["REDIS_CLUSTER_URL"]? || "redis:///")) #

Pass a URI (defaulting to the REDIS_CLUSTER_URL environment variable) to connect to the specified Redis cluster — the URI can point to any server in the cluster and Redis::Cluster will discover the rest.


[View source]

Class Method Detail

def self.register_read_only_command(command : String) #

Tell the cluster driver that the specified Redis command can be routed to read-only replicas.

Redis::Cluster.register_read_only_command "mymodule.get"

[View source]
def self.register_read_only_commands(commands : Enumerable(String)) #

Tell the cluster driver that all the specified Redis commands can be routed to read-only replicas.

Redis::Cluster.register_read_only_commands %w[
  mymodule.get
  mymodule.mget
]

[View source]

Instance Method Detail

def close #

Close all connections to this Redis cluster


[View source]
def flushdb #

Executes #flushdb on each shard in the cluster.


[View source]
def pipeline(key : String, &) #

Run a pipeline for the specified key

cluster.pipeline "{widgets}" do |pipe|
  widget_ids.each do |id|
    pipe.get "{widgets}:#{id}"
  end
end

WARNING All keys that this pipeline operates on MUST reside on the same shard. It's best to pass a pre-hashed key (one containing {}) to this method. See the example above.


[View source]
def run(command full_command) #
Description copied from module Redis::Commands

Execute the given command and return the result from the server. Commands must be an Enumerable and its size method must be re-entrant.

run({"set", "foo", "bar"})

[View source]
def scan_each(*args, **kwargs, &) : Nil #

Execute Commands#scan_each on each shard, yielding any matching keys.


[View source]
def slot_for(key : String) #

Return the Redis hash slot for the given key. This is useful for seeing which shard your command will be routed to.


[View source]