module Redis::Commands::Stream

Direct including types

Defined in:

commands/stream.cr

Instance Method Summary

Instance Method Detail

def xack(key : String, group : String, id : String) #

[View source]
def xack(key : String, group : String, ids : Enumerable(String)) #

[View source]
def xadd(key : String, id : String, maxlen, data : Hash(String, String)) #

Append an entry with the specified data to the stream with the given key and gives it the specified id. If the id is "*", Redis will assign it an id of the form "#{Time.utc.to_unix_ms}-#{autoincrementing_index}". If maxlen is provided, Redis will trim the stream to the specified length. If maxlen is of the form ~ 1000, Redis will trim it to approximately that length, removing entries when it can do so efficiently. This method returns the id that Redis stores.

redis.xadd "my-stream", "*", {"name" => "foo", "id" => UUID.random.to_s}

[View source]
def xadd(key : String, id : String, data : Hash(String, String)) #

Append an entry with the specified data to the stream with the given key and gives it the specified id. If the id is "*", Redis will assign it an id of the form "#{Time.utc.to_unix_ms}-#{autoincrementing_index}". If maxlen is provided, Redis will trim the stream to the specified length. If maxlen is of the form ~ 1000, Redis will trim it to approximately that length, removing entries when it can do so efficiently. This method returns the id that Redis stores.

redis.xadd "my-stream", "*", {"name" => "foo", "id" => UUID.random.to_s}

[View source]
def xadd(key : String, id : String, maxlen = nil, **data) #

Append an entry with the specified data to the stream with the given key and gives it the specified id. If the id is "*", Redis will assign it an id of the form "#{Time.utc.to_unix_ms}-#{autoincrementing_index}". If maxlen is provided, Redis will trim the stream to the specified length. If maxlen is of the form ~ 1000, Redis will trim it to approximately that length, removing entries when it can do so efficiently. This method returns the id that Redis stores.

redis.xadd "my-stream", "*", name: "foo", id: UUID.random.to_s

[View source]
def xautoclaim(key : String, group : String, consumer : String, min_idle_time : Time::Span, start : String, count : Int | String | Nil = nil) #

[View source]
def xdel(key : String, ids : Enumerable(String)) #

[View source]
def xdel(key : String, *ids : String) #

[View source]
def xgroup(command : String, key : String, groupname : String) #

Run a Redis XGROUP subcommand for a given stream. See the XGROUP command in the Redis documentation for more information.

redis.xgroup "DESTROY", "my-stream", "my-group"

[View source]
def xgroup(command : XGroup, key : String, groupname : String, *, id : String | Nil = nil, mkstream = false, consumer_name : String | Nil = nil) #

Run a Redis XGROUP subcommand for a given stream. See the XGROUP command in the Redis documentation for more information.

redis.xgroup :create, "my-stream", "my-group", mkstream: true

[View source]
def xgroup(command : String, key : String, groupname : String, *args : String) #

Run a Redis XGROUP subcommand for a given stream. See the XGROUP command in the Redis documentation for more information.

redis.xgroup "CREATE", "my-stream", "my-group", "0"

[View source]
def xgroup_create(key : String, groupname : String, *, id : String = "$", mkstream = false) #

[View source]
def xgroup_create_consumer(key : String, groupname : String, consumer_name : String) #

XGROUP CREATECONSUMER key groupname consumername


[View source]
def xlen(key : String) #

Return the number of entries in the given stream


[View source]
def xpending(key : String, group : String, start : String, end finish : String, count : String | Int, idle : String | Time::Span | Nil = nil) #

[View source]
def xpending(key : String, group : String) #

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]


[View source]
def xrange(key : String, start min, end max, count = nil) #

Return the entries in the given stream between the start and end ids. If count is provided, Redis will return only that number of entries.


[View source]
def xreadgroup(group : String, consumer : String, count : String | Int | Nil = nil, block : Time::Span | String | Int | Nil = nil, no_ack = false, streams : ::Hash(String, String) = {} of String => String) #

Execute an XREADGROUP command on the Redis server.

This is returned in its raw form from Redis, but you can pass it to a Redis::Streaming::XReadGroupResponse to make it easier to work with.


[View source]
def xreadgroup(group : String, consumer : String, count : String | Int | Nil = nil, block : Time::Span | String | Int | Nil = nil, no_ack = false, streams : NamedTuple = NamedTuple.new) #

Execute an XREADGROUP command on the Redis server. If block is not nil, the server will block for up to that much time (if you pass a number, it will be interpreted as milliseconds) until any new messages enter the stream.

This is returned in its raw form from Redis, but you can pass it to a Redis::Streaming::XReadGroupResponse to make it easier to work with.

# Long-poll for up to 10 messages from the stream with key `my_stream`,
# blocking for up to 2 seconds if there are no messages waiting.
response = redis.xreadgroup "group", "consumer",
  streams: {my_stream: ">"},
  count: 10,
  block: 2.seconds
response = Redis::Streaming::XReadGroupResponse.new(response)

[View source]