struct Servo::Broker::RedisBroker

Overview

Implementation of Servo::Broker::BrokerInterface for Redis.

Uses Redis 5 streams for communicating with the gateway.

Included Modules

Defined in:

broker/redis_broker.cr

Constructors

Instance Method Summary

Instance methods inherited from module Servo::Broker::BrokerInterface

publish(event_name : String, data : Hash(String, String)) : Nil publish, subscribe(event_name : String) : Nil
subscribe(event_names : Array(String)) : Nil
subscribe

Constructor Detail

def self.new(dispatcher : AED::EventDispatcherInterface, uri : String, consumer_name : String, group : String = "gateway") #

[View source]

Instance Method Detail

def auto_claim(event_name : String) : Nil #

Tells the broker to auto claim messages for the event_name stream.


[View source]
def consumer_name : String #

The unique identifier for this broker consumer.


[View source]
def dispatch_event(event_name : String, message : Redis::Streaming::Message) #

Handles dispatching converting a redis message to its respective event instance and dispatching it.


[View source]
def dispatcher : AED::EventDispatcherInterface #

The AED::EventDispatcher instnance this broker should dispatch events to.


[View source]
def group : String #

The name of the group to consumes streams on. The name should match the broker group specified on the gateway client.


[View source]
def listen : Nil #

Starts the autoclaim listener and incoming message listener with sensible defaults.

Before calling this method you should subscribe and autoclaim events with #subscribe and #auto_claim.

If you wish to change the options for the incoming listener or the autoclaim listener you should call #listen_for_incoming and #listen_for_pending seperately and pass the options into each method.

These methods are blocking so you should run them in seperate fibers.

Example:

broker = Servo::Broker::RedisBroker.new
# Starts the auto claim listener on a new fiber that will claim messages
# that haven't been claimed for 30 seconds.
spawn do
  broker.listen_for_pending(timeout: 30.seconds)
end
# Starts the incoming message listener on the main fiber
# recieving 30 elements and blocking for 8 seconds.
broker.listen_for_incoming(30, 8.seconds)

[View source]
def listen_for_incoming(count : UInt32 = 10, block_for : Time::Span = 5.seconds) : Nil #

Listens for incoming messages.

Uses redis XREADGROUP command to listen for incoming data and dispatch it to its respective listener.

The method takes two parameters, count is the amount of elements to return per stream and block_for which is the amount of time to block the client for if there is no data.


[View source]
def listen_for_pending(count : UInt32 = 10, start : String = "0-0", timeout : Time::Span = 10.seconds) #

Uses redis XAUTOCLAIM command to claim messages that have not been acknowledged after timeout and dispatch it to its respective listener if the broker was told to autoclaim it with #auto_claim.

If count is specified it will change the upper limit of messages to claim. It is set to 10 by by default.


[View source]
def publish(event_name : String, data : Hash(String, String)) : Nil #

Publishes data to the broker.

If the stream at the key event_name does not exist the key will be created with a stream.


[View source]
def redis : Redis::Client #

The base Redis::Client instance.


[View source]
def subscribe(event_name : String) : Nil #

Subscribes the broker to an event.

This method only tells the broker to listen for incoming messages. If you want to autoclaim unacknowledged messages for a stream you should use #auto_claim.


[View source]