class LavinMQ::Queue::MessageStore

Overview

Message store This handles writing msgs to segments on disk Keeping a list of deleted messages in memory and on disk You can shift through the message store, but not requeue msgs That has to be handled at another layer Writes messages to segments on disk Messages are refered to as SegmentPositions Deleted messages are written to acks.#{segment}

Direct Known Subclasses

Defined in:

lavinmq/amqp/queue/message_store.cr

Constant Summary

Log = LavinMQ::Log.for("message_store")
PURGE_YIELD_INTERVAL = 16384

Constructors

Instance Method Summary

Constructor Detail

def self.new(queue_data_dir : String, replicator : Clustering::Replicator | Nil, metadata : ::Log::Metadata = ::Log::Metadata.empty) #

[View source]

Instance Method Detail

def [](sp : SegmentPosition) : BytesMessage #

[View source]
def avg_bytesize : UInt32 #

[View source]
def bytesize : UInt64 #

[View source]
def close : Nil #

[View source]
def delete(sp) : Nil #

[View source]
def delete #

[View source]
def delete_file(file) #

[View source]
def empty? #

[View source]
def empty_change : Channel(Bool) #

[View source]
def first? : Envelope | Nil #

[View source]
def purge(max_count : Int = UInt32::MAX) : UInt32 #

Deletes all "ready" messages (not unacked)


[View source]
def push(msg) : SegmentPosition #

[View source]
def requeue(sp : SegmentPosition) #

[View source]
def shift?(consumer = nil) : Envelope | Nil #

[View source]
def size : UInt32 #

[View source]
def unmap_segments(except : Enumerable(UInt32) = StaticArray(UInt32, 0).new(0_u32)) #

[View source]