class Redis::ReplicationClient
- Redis::ReplicationClient
- Reference
- Object
Overview
If you're using Redis replication, you can use ReplicationClient
to send
read commands to replicas and reduce load on the primary. This can be important
when your Redis primary is CPU-bound.
The commands that will be routed to replicas are listed in
Redis::READ_ONLY_COMMANDS
.
NOTE Redis replication does not provide consistency guarantees. Every
mechanism in Redis to improve consistency, such as
WAIT, is best-effort,
but not guaranteed. If you require strong consistency from Redis, stick to
using Redis::Client
. if you require strong consistency but your Redis primary
is CPU-bound, you may need to either choose between consistency and performance
or move that workload out of Redis.
This client is useful for operations where strong consistency isn't typically
needed, such as caching, full-text search with Redis::FullText#search
,
querying time-series data with Redis::TimeSeries#mrange
, checking the current
state of larger data structures without blocking the primary, etc.
Explicitly routing commands to a primary or replica
This class provides #on_primary
and #on_replica
methods to ensure your
command is routed to the server type you want. This is useful in several
scenarios:
- you want to ensure you retrieve a value that is consistent with the state of the primary server — for example a value that changes frequently and you need the canonical state for observability purposes
- a read-only command is routed to a primary because this client does not yet
know about it
- You can add commands to
Redis::READ_ONLY_COMMANDS
in one-off cases - Feel free to open an issue or pull request to add it, as well
- You can add commands to
Topology changes
If the replication topology changes (for example, new replicas are added,
existing ones removed, or the primary failed over), ReplicationClient
will
automatically pick up the changes. You can set how often it checks for these
changes with the #topology_ttl
argument to the constructor or leave it at its
default of 10 seconds.
EXPERIMENTAL ReplicationClient
is currently in alpha testing. There may be rough edges.
Included Modules
Defined in:
replication_client.crConstant Summary
-
Log =
::Log.for(self)
Constructors
-
.new(entrypoint : URI, topology_ttl : Time::Span = 10.seconds)
Have the
ReplicationClient
discover the master and replicas on its own when given the URI of a single entrypoint. -
.new(master_uri : URI, replica_uris : Array(URI), topology_ttl : Time::Span = 10.seconds)
Initialize the client with known master and replica URIs, keeping the toplogy up to date with at most
#topology_ttl
staleness. - .new
Instance Method Summary
-
#close
Close all connections to both the primary and all replicas.
-
#closed? : Bool
Returns
true
if thisReplicationClient
has been explicitly closed,false
otherwise. -
#on_master(&)
Alias of
#on_primary
. -
#on_primary(&)
Route one or more commands to the primary to avoid consistency issues arising from replication latency.
-
#on_replica(&)
Route one or more commands to replicas.
-
#run(command full_command)
Execute the given command and return the result from the server.
- #topology_ttl : Time::Span
Instance methods inherited from module Redis::Commands
decr(key : String)
decr,
decrby(key : String, amount : Int | String)
decrby,
del(keys : Enumerable(String))del(*keys : String) del, eval(script : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) eval, eval_ro(script : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) eval_ro, evalsha(sha : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) evalsha, evalsha_ro(script : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) evalsha_ro, exists(*keys : String) exists, expire(key : String, ttl : Time::Span)
expire(key : String, ttl : Int) expire, expireat(key : String, at : Time) expireat, flushall flushall, flushdb flushdb, ft ft, get(key : String) get, graph(key : String) graph, incr(key : String) incr, incrby(key : String, amount : Int | String) incrby, info info, json json, keys(pattern = "*") keys, mget(keys : Enumerable(String)) mget, mset(data : Hash(String, String)) mset, pexpire(key : String, ttl : Time::Span)
pexpire(key : String, ttl : Int) pexpire, pexpireat(key : String, at : Time) pexpireat, pttl(key : String) pttl, publish(channel : String, message : String) publish, run(command) run, scan(cursor : String = "0", match : String | Nil = nil, count : String | Int | Nil = nil, type : String | Nil = nil) scan, script_exists(shas : Enumerable(String))
script_exists(*shas : String) script_exists, script_flush(mode : ScriptFlushMode) script_flush, script_kill script_kill, script_load(script : String) script_load, set(key : String, value : String, ex : String | Int | Nil = nil, px : String | Int | Nil = nil, nx = false, xx = false, keepttl = false)
set(key, value, ex : Time, nx = false, xx = false, keepttl = false)
set(key, value, ex : Time::Span, nx = false, xx = false, keepttl = false) set, ts ts, ttl(key : String) ttl, type(key : String) type, unlink(keys : Enumerable(String))
unlink(*keys : String) unlink, wait(numreplicas replica_count : Int | String, timeout : Time::Span)
wait(numreplicas replica_count : Int | String, timeout : Int | String) wait
Instance methods inherited from module Redis::Commands::Stream
xack(key : String, group : String, id : String)xack(key : String, group : String, ids : Enumerable(String)) xack, xadd(key : String, id : String, maxlen, data : ::Hash(String, String))
xadd(key : String, id : String, data : ::Hash(String, String))
xadd(key : String, id : String, maxlen = nil, **data) xadd, xautoclaim(key : String, group : String, consumer : String, min_idle_time : Time::Span, start : String, count : Int | String | Nil = nil) xautoclaim, xdel(key : String, ids : Enumerable(String))
xdel(key : String, *ids : String) xdel, xgroup(command : String, key : String, groupname : String)
xgroup(command : XGroup, key : String, groupname : String, *, id : String | Nil = nil, mkstream = false, consumer_name : String | Nil = nil)
xgroup(command : String, key : String, groupname : String, *args : String) xgroup, xgroup_create(key : String, groupname : String, *, id : String = "$", mkstream = false) xgroup_create, xgroup_create_consumer(key : String, groupname : String, consumer_name : String) xgroup_create_consumer, xlen(key : String) xlen, xpending(key : String, group : String, start : String, end finish : String, count : String | Int, idle : String | Time::Span | Nil = nil)
xpending(key : String, group : String) xpending, xrange(key : String, start min : String, end max : String, count : String | Int | Nil = nil) xrange, xreadgroup(group : String, consumer : String, count : String | Int | Nil = nil, block : Time::Span | String | Int | Nil = nil, no_ack = false, streams : ::Hash(String, String) = {} of String => String)
xreadgroup(group : String, consumer : String, count : String | Int | Nil = nil, block : Time::Span | String | Int | Nil = nil, no_ack = false, streams : NamedTuple = NamedTuple.new) xreadgroup
Instance methods inherited from module Redis::Commands::SortedSet
zadd(key : String, score : String | Int64, value : String)zadd(key : String, values : Enumerable(String)) zadd, zcard(key : String) zcard, zcount(key : String, min : String, max : String) zcount, zrange(key : String, starting : String | Int64, ending : String | Int64, with_scores : Bool = false) zrange, zrangebyscore(key : String, low : String | Int64, high : String | Int64, limit : Enumerable(String) | Nil = nil) zrangebyscore, zrem(key : String, value : String)
zrem(key : String, values : Enumerable(String))
zrem(key : String, *values : String) zrem, zremrangebyrank(key : String, low : Int64, high : Int64) zremrangebyrank, zremrangebyscore(key : String, low : String | Int64, high : String | Int64) zremrangebyscore, zrevrange(key : String, starting : String | Int64, ending : String | Int64, with_scores : Bool = false) zrevrange, zscore(key : String, value : String) zscore
Instance methods inherited from module Redis::Commands::Set
sadd(key : String, *values : String)
sadd,
scard(key : String)
scard,
sdiff(first : String, second : String)
sdiff,
sinter(first : String, *others : String)
sinter,
sismember(key : String, value : String)
sismember,
smembers(key : String)
smembers,
srem(key : String, members : Enumerable(String))srem(key : String, *values : String) srem
Instance methods inherited from module Redis::Commands::List
blpop(keys : Enumerable(String), timeout : Time::Span)blpop(*keys : String, timeout : Time::Span)
blpop(*keys : String, timeout : Int | Float)
blpop(*keys : String, timeout : String) blpop, brpop(keys : Enumerable(String), timeout : Int)
brpop(*keys : String, timeout : Time::Span)
brpop(*keys : String, timeout : Number)
brpop(*keys : String, timeout : String) brpop, brpoplpush(source : String, destination : String, timeout : Time::Span)
brpoplpush(source : String, destination : String, timeout : Int | String) brpoplpush, llen(key : String) llen, lmove(from source : String, to destination : String, from_side source_side : Side, to_side destination_side : Side) lmove, lpop(key : String, count : String | Nil = nil) lpop, lpush(key : String, values : Enumerable(String))
lpush(key, *values : String) lpush, lrange(key : String, start : String | Int, finish : String | Int) lrange, lrem(key : String, count : Int, value : String) lrem, ltrim(key : String, start : String | Int, stop : String | Int)
ltrim(key : String, range : Range(String, String))
ltrim(key : String, range : Range(Int32, Int32)) ltrim, rpop(key : String) rpop, rpoplpush(source : String, destination : String) rpoplpush, rpush(key : String, values : Enumerable(String))
rpush(key, *values : String) rpush
Instance methods inherited from module Redis::Commands::Hash
hdel(key : String, fields : Enumerable(String))hdel(key : String, *fields : String) hdel, hget(key : String, field : String) hget, hgetall(key : String) hgetall, hincrby(key : String, field : String, increment : Int | String) hincrby, hmget(key : String, fields : Enumerable(String))
hmget(key : String, *fields : String) hmget, hmset(key : String, data : ::Hash(String, String)) hmset, hscan(key : String, cursor : String, *, match pattern : String | Nil = nil, count : String | Int | Nil = nil) hscan, hset(key : String, fields : Enumerable(String))
hset(key : String, fields : ::Hash(String, String))
hset(key : String, *fields : String)
hset(key : String, **fields : String) hset, hsetnx(key : String, field : String, value : String) hsetnx
Constructor Detail
Have the ReplicationClient
discover the master and replicas on its own
when given the URI of a single entrypoint. The cluster topology will be
refreshed with a max staleness of #topology_ttl
.
redis = Redis::ReplicationClient.new(
Initialize the client with known master and replica URIs, keeping the
toplogy up to date with at most #topology_ttl
staleness. If you don't wish
to keep the replication topology up to date, you can simply set
#topology_ttl
to 0.seconds
.
Instance Method Detail
Returns true
if this ReplicationClient
has been explicitly closed,
false
otherwise.
Route one or more commands to the primary to avoid consistency issues arising from replication latency.
require "redis/replication_client"
redis = Redis::ReplicationClient.new
redis.incr "counter"
value = redis.on_primary &.get("counter")
This is useful for pipelining commands or executing transactions:
redis.on_primary &.transaction do |txn|
txn.incr "counter:#{queue}"
txn.sadd "queues", queue
txn.lpush "queue:#{queue}", job_data
end
… which is shorthand for this and removes the need for nesting blocks:
redis.on_primary do |primary|
primary.transaction do |txn|
txn.incr "counter:#{queue}"
txn.sadd "queues", queue
txn.lpush "queue:#{queue}", job_data
end
end
If you need to route many commands to the primary without necessarily
pipelining or opening transactions, you can omit the &.transaction
and
call methods directly on the primary's Redis::Client
in the block:
redis.on_primary do |primary|
counter = primary.incr "counter:#{queue}"
primary.sadd "queues", queue
end
NOTE The object yielded to the block is a Redis::Client
, but if you try
to use it outside the block you may run into errors because the replication
topology could change, in which case this Redis::Client
might not be the
primary anymore.
Route one or more commands to replicas. This should rarely be necessary since
read-only commands (which can only be executed on replicas) are automatically
routed to replicas, but if it's a command this shard does not know about (see
Redis::READ_ONLY_COMMANDS
) this may be necessary. Alternatively, you can
shovel additional commands into Redis::READ_ONLY_COMMANDS
to avoid having to
perform this explicit routing.
Execute the given command and return the result from the server. Commands
must be an Enumerable
and its size
method must be re-entrant.
run({"set", "foo", "bar"})