class LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore

Overview

A delayed exchange queue must have its messages order by "expire at" so the message expire loop will look at the right message. To acheive this messages are always added to a custom requeued store. This requeued store acts as a inmemory index where messages are ordered based on when they should be published. The reason why the requeued store is used, is that #shift and #first? will look for any requeued messages first, then read the next from disk. For a delayed exchange queue we never want to read messages in the order they arrived (was written to disk).

Defined in:

lavinmq/amqp/queue/delayed_exchange_queue/delayed_message_store.cr
lavinmq/amqp/queue/delayed_exchange_queue/delayed_requeued_store.cr

Constructors

Instance Method Summary

Instance methods inherited from class LavinMQ::MessageStore

[](sp : SegmentPosition) : BytesMessage [], avg_bytesize : UInt32 avg_bytesize, bytesize : UInt64 bytesize, close : Nil close, closed : Bool closed, delete(sp) : Nil
delete
delete
, empty : BoolChannel empty, empty? empty?, first? : Envelope | Nil first?, purge(max_count : Int = UInt32::MAX) : UInt32 purge, purge_all purge_all, push(msg) : SegmentPosition push, requeue(sp : SegmentPosition) requeue, shift?(consumer = nil) : Envelope | Nil shift?, size : UInt32 size

Constructor methods inherited from class LavinMQ::MessageStore

new(msg_dir : String, replicator : Clustering::Replicator | Nil, durable : Bool = true, metadata : ::Log::Metadata = ::Log::Metadata.empty) new

Constructor Detail

def self.new(*args, **kwargs) #

[View source]

Instance Method Detail

def build_index #

[View source]
def first? : Envelope | Nil #

[View source]
def push(msg) : SegmentPosition #

Overload to add the segment position to our "index"


[View source]
def requeue(sp : SegmentPosition) #

[View source]
def shift?(consumer = nil) : Envelope | Nil #

[View source]
def time_to_next_expiration? : Time::Span | Nil #

Customization used by DelayedExchangeQueue


[View source]