class AMQP::Client::Channel

Defined in:

amqp-client/channel.cr
amqp-client/errors.cr

Instance Method Summary

Instance Method Detail

def basic_ack(delivery_tag : UInt64, multiple = false) : Nil #

Acknowledge a message with delivery_tag, or all message up and including delivery_tag if multiple set set


[View source]
def basic_cancel(consumer_tag, no_wait = false) : Nil #

Cancel the consumer with the consumer_tag Even with no_wait = false the method will return immediately, but outstanding deliveries will be processed.


[View source]
def basic_consume(queue, tag = "", no_ack = true, exclusive = false, block = false, args arguments = Arguments.new, work_pool = 1, &blk : DeliverMessage -> Nil) #

Consume messages from a queue

  • Make sure to eventually ack or reject each message if no_ack is false
  • The exclusive flags ensures that only a single consumer receives messages from the queue at the time
  • The method will block if the flag is set, until the consumer/channel/connection is closed or the callback raises an exception
  • To let multiple fibers process messages increase work_pool Make sure to handle all exceptions in the consume block, as unhandeled exceptions will cause the channel to be closed (to prevent dangling unacked messages and exception floods).

[View source]
def basic_get(queue : String, no_ack = true) : GetMessage | Nil #

Get a single message from a queue The message must eventually be acked or rejected if no_ack is false


[View source]
def basic_nack(delivery_tag : UInt64, requeue = false, multiple = false) : Nil #

Reject a message with delivery_tag, optionally requeue it Reject all message up to and including delivery_tag if multiple is true


[View source]
def basic_publish(bytes : Bytes, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new) #

Publish a bytes message, to an exchange with routing_key


[View source]
def basic_publish(bytes : Bytes, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, &blk : Bool -> Nil) #

[View source]
def basic_publish(string : String, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new) #

Publish a string message, to an exchange with routing_key


[View source]
def basic_publish(string : String, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, &blk : Bool -> Nil) #

[View source]
def basic_publish(io : IO::Memory | IO::FileDescriptor, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new) #

Publish an io message, to an exchange with routing_key Only data from the current position of the IO to the end will be published. The position will be restored after publish.


[View source]
def basic_publish(io : IO::Memory | IO::FileDescriptor, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, &blk : Bool -> Nil) #

[View source]
def basic_publish(body : IO | Bytes, bytesize : Int, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, &blk : Bool -> Nil) : UInt64 #

[View source]
def basic_publish(body : IO | Bytes, bytesize : Int, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, blk : Proc(Bool, Nil) | Nil = nil) : UInt64 #

Publish a message with a set bytesize, to an exchange with routing_key


[View source]
def basic_publish_confirm(io : IO, bytesize : Int, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new) : Bool #

[View source]
def basic_publish_confirm(msg, exchange, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new) : Bool #

[View source]
def basic_qos(count, global = false) : Nil #

Set prefetch limit to count messages, no more messages will be delivered to the consumer until one or more message have been acknowledged or rejected


[View source]
def basic_recover(requeue : Bool) : Nil #

Tell the broker to either deliver all unacknowledge messages again if requeue is false or rejecting all if requeue is true

Unacknowledged messages retrived by #basic_get are requeued regardless.


[View source]
def basic_reject(delivery_tag : UInt64, requeue = false) : Nil #

Reject a message with delivery_tag, optionally requeue it


[View source]
def close(reason = "", code = 200) : Nil #

Close the channel The reason might be logged by the server


[View source]
def closed? : Bool #

[View source]
def confirm_select(no_wait = false) : Nil #

Sets the channel in publish confirm mode, each published message will be acked or nacked


[View source]
def default_exchange #

[View source]
def direct_exchange(name = "amq.direct", passive = true) #

[View source]
def exchange(name, type, passive = false, durable = true, internal = false, auto_delete = false, args arguments = Arguments.new) #

Convinence method for Exchange handling


[View source]
def exchange_bind(source : String, destination : String, routing_key : String, no_wait = false, args arguments = Arguments.new) : Nil #

Bind an exchange to another exchange


[View source]
def exchange_declare(name : String, type : String, passive = false, durable = true, internal = false, auto_delete = false, no_wait = false, args arguments = Arguments.new) : Nil #

Declares an exchange


[View source]
def exchange_delete(name, if_unused = false, no_wait = false) : Nil #

Deletes an exchange


[View source]
def exchange_unbind(source : String, destination : String, routing_key : String, no_wait = false, args arguments = Arguments.new) : Nil #

Unbind an exchange from another exchange


[View source]
def fanout_exchange(name = "amq.fanout", passive = true) #

[View source]
def flow(active : Bool) #

Stop/start the flow of messages to consumers Not supported by all brokers


[View source]
def header_exchange(name = "amq.header", passive = true) #

[View source]
def id : UInt16 #

[View source]
def inspect(io : IO) : Nil #
Description copied from class Reference

Appends a String representation of this object which includes its class name, its object address and the values of all instance variables.

class Person
  def initialize(@name : String, @age : Int32)
  end
end

Person.new("John", 32).inspect # => #<Person:0x10fd31f20 @name="John", @age=32>

[View source]
def on_cancel(&blk : String -> Nil) #

Callback that will be called if a consumer is cancelled by the server The argument to the callback is the consumer tag


[View source]
def on_close(&blk : UInt16, String -> ) #

Callback that will be called if the channel is closed by the server


[View source]
def on_return(&blk : ReturnedMessage -> Nil) #

Callback that is called if a published message is returned by the server


[View source]
def prefetch(count, global = false) : Nil #

Alias for #basic_qos


[View source]
def queue(name : String, passive = false, durable = true, exclusive = false, auto_delete = false, args arguments = Arguments.new) #

Declares a queue with a name, by default durable and not auto-deleted


[View source]
def queue #

Declares a temporary queue, which won't be durable and auto-deleted when not used anymore


[View source]
def queue_bind(queue : String, exchange : String, routing_key : String, no_wait = false, args arguments = Arguments.new) : Nil #

Bind a queue to an exchange, with a routing_key and optionally some arguments


[View source]
def queue_declare(name : String, passive = false, durable = name.empty? ? false : true, exclusive = name.empty? ? true : false, auto_delete = name.empty? ? true : false, no_wait = false, args arguments = Arguments.new) #

Declare a queue with name passive will raise if the queue doesn't already exists, other arguments are ignored durable will make the queue durable on the server (note that messages have have the persistent flag set to make the messages persistent) exclusive will make the queue exclusive to the channel and will be deleted when the channel is closed auto_delete will delete the queue when the last consumer as stopped consuming no_wait will cause the method to return without waiting for a confirmation from the server


[View source]
def queue_delete(name : String, if_unused = false, if_empty = false, no_wait = false) #

Delete a queue


[View source]
def queue_purge(name : String, no_wait = false) #

Purge/empty a queue, will return the number of messages deleted


[View source]
def queue_unbind(queue : String, exchange : String, routing_key : String, args arguments = Arguments.new) : Nil #

Unbind a queue from an exchange, with a routing_key and optionally some arguments


[View source]
def topic_exchange(name = "amq.topic", passive = true) #

[View source]
def transaction(&) #

Commits a transaction if the block returns, rolls back the transaction if the block raises an exception


[View source]
def tx_commit : Nil #

Commit a transaction


[View source]
def tx_rollback : Nil #

Rollback a transaction


[View source]
def tx_select : Nil #

Set the Channel in transaction mode


[View source]
def wait_for_confirms : Bool #

Returns when there are no unconfirmed publishes on the channel Raises if there was any negative acknowledgements


[View source]