class
LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore
Overview
A delayed exchange queue must have its messages order by "expire at" so the message expire loop will look at the right message. To acheive this messages are always added to a custom requeued store. This requeued store acts as a inmemory index where messages are ordered based on when they should be published. The reason why the requeued store is used, is that #shift and #first? will look for any requeued messages first, then read the next from disk. For a delayed exchange queue we never want to read messages in the order they arrived (was written to disk).
Defined in:
lavinmq/amqp/queue/delayed_exchange_queue/delayed_message_store.crlavinmq/amqp/queue/delayed_exchange_queue/delayed_requeued_store.cr
Constructors
Instance Method Summary
- #build_index
- #first? : Envelope | Nil
-
#push(msg) : SegmentPosition
Overload to add the segment position to our "index"
- #requeue(sp : SegmentPosition)
- #shift?(consumer = nil) : Envelope | Nil
-
#time_to_next_expiration? : Time::Span | Nil
Customization used by DelayedExchangeQueue
Instance methods inherited from class LavinMQ::MessageStore
[](sp : SegmentPosition) : BytesMessage
[],
avg_bytesize : UInt32
avg_bytesize,
bytesize : UInt64
bytesize,
close : Nil
close,
closed : Bool
closed,
delete(sp) : Nildelete delete, empty : BoolChannel empty, empty? empty?, first? : Envelope | Nil first?, purge(max_count : Int = UInt32::MAX) : UInt32 purge, purge_all purge_all, push(msg) : SegmentPosition push, requeue(sp : SegmentPosition) requeue, shift?(consumer = nil) : Envelope | Nil shift?, size : UInt32 size