struct Servo::Broker::RedisBroker
- Servo::Broker::RedisBroker
- Struct
- Value
- Object
Overview
Implementation of Servo::Broker::BrokerInterface
for Redis.
Uses Redis 5 streams for communicating with the gateway.
Included Modules
Defined in:
broker/redis_broker.crConstructors
Instance Method Summary
-
#auto_claim(event_name : String) : Nil
Tells the broker to auto claim messages for the event_name stream.
-
#consumer_name : String
The unique identifier for this broker consumer.
-
#dispatch_event(event_name : String, message : Redis::Streaming::Message)
Handles dispatching converting a redis message to its respective event instance and dispatching it.
-
#dispatcher : AED::EventDispatcherInterface
The
AED::EventDispatcher
instnance this broker should dispatch events to. -
#group : String
The name of the group to consumes streams on.
-
#listen : Nil
Starts the autoclaim listener and incoming message listener with sensible defaults.
-
#listen_for_incoming(count : UInt32 = 10, block_for : Time::Span = 5.seconds) : Nil
Listens for incoming messages.
-
#listen_for_pending(count : UInt32 = 10, start : String = "0-0", timeout : Time::Span = 10.seconds)
Uses redis XAUTOCLAIM command to claim messages that have not been acknowledged after timeout and dispatch it to its respective listener if the broker was told to autoclaim it with
#auto_claim
. -
#publish(event_name : String, data : Hash(String, String)) : Nil
Publishes data to the broker.
-
#redis : Redis::Client
The base
Redis::Client
instance. -
#subscribe(event_name : String) : Nil
Subscribes the broker to an event.
Instance methods inherited from module Servo::Broker::BrokerInterface
publish(event_name : String, data : Hash(String, String)) : Nil
publish,
subscribe(event_name : String) : Nilsubscribe(event_names : Array(String)) : Nil subscribe
Constructor Detail
Instance Method Detail
Tells the broker to auto claim messages for the event_name stream.
Handles dispatching converting a redis message to its respective event instance and dispatching it.
The AED::EventDispatcher
instnance this broker should
dispatch events to.
The name of the group to consumes streams on. The name should match the broker group specified on the gateway client.
Starts the autoclaim listener and incoming message listener with sensible defaults.
Before calling this method you should subscribe and autoclaim events
with #subscribe
and #auto_claim
.
If you wish to change the options for the incoming listener or the
autoclaim listener you should call #listen_for_incoming
and
#listen_for_pending
seperately and pass the options into each method.
These methods are blocking so you should run them in seperate fibers.
Example:
broker = Servo::Broker::RedisBroker.new
# Starts the auto claim listener on a new fiber that will claim messages
# that haven't been claimed for 30 seconds.
spawn do
broker.listen_for_pending(timeout: 30.seconds)
end
# Starts the incoming message listener on the main fiber
# recieving 30 elements and blocking for 8 seconds.
broker.listen_for_incoming(30, 8.seconds)
Listens for incoming messages.
Uses redis XREADGROUP command to listen for incoming data and dispatch it to its respective listener.
The method takes two parameters, count is the amount of elements to return per stream and block_for which is the amount of time to block the client for if there is no data.
Uses redis XAUTOCLAIM command to claim messages that have not been acknowledged
after timeout and dispatch it to its respective listener if the broker was told
to autoclaim it with #auto_claim
.
If count is specified it will change the upper limit of messages to claim. It is set to 10 by by default.
Publishes data to the broker.
If the stream at the key event_name does not exist the key will be created with a stream.
Subscribes the broker to an event.
This method only tells the broker to listen for incoming messages.
If you want to autoclaim unacknowledged messages for a stream you
should use #auto_claim
.