class AMQP::Client::Channel
- AMQP::Client::Channel
- Reference
- Object
Defined in:
amqp-client/channel.cramqp-client/errors.cr
Instance Method Summary
-
#basic_ack(delivery_tag : UInt64, multiple = false) : Nil
Acknowledge a message with delivery_tag, or all message up and including delivery_tag if multiple set set
-
#basic_cancel(consumer_tag, no_wait = false) : Nil
Cancel the consumer with the consumer_tag Even with no_wait = false the method will return immediately, but outstanding deliveries will be processed.
-
#basic_consume(queue, tag = "", no_ack = true, exclusive = false, block = false, args arguments = Arguments.new, work_pool = 1, &blk : DeliverMessage -> Nil)
Consume messages from a queue
-
#basic_get(queue : String, no_ack = true) : GetMessage | Nil
Get a single message from a queue The message must eventually be acked or rejected if no_ack is false
-
#basic_nack(delivery_tag : UInt64, requeue = false, multiple = false) : Nil
Reject a message with delivery_tag, optionally requeue it Reject all message up to and including delivery_tag if multiple is true
-
#basic_publish(bytes : Bytes, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new)
Publish a bytes message, to an exchange with routing_key
- #basic_publish(bytes : Bytes, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, &blk : Bool -> Nil)
-
#basic_publish(string : String, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new)
Publish a string message, to an exchange with routing_key
- #basic_publish(string : String, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, &blk : Bool -> Nil)
-
#basic_publish(io : IO::Memory | IO::FileDescriptor, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new)
Publish an io message, to an exchange with routing_key Only data from the current position of the IO to the end will be published.
- #basic_publish(io : IO::Memory | IO::FileDescriptor, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, &blk : Bool -> Nil)
- #basic_publish(body : IO | Bytes, bytesize : Int, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, &blk : Bool -> Nil) : UInt64
-
#basic_publish(body : IO | Bytes, bytesize : Int, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, blk : Proc(Bool, Nil) | Nil = nil) : UInt64
Publish a message with a set bytesize, to an exchange with routing_key
- #basic_publish_confirm(io : IO, bytesize : Int, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new) : Bool
- #basic_publish_confirm(msg, exchange, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new) : Bool
-
#basic_qos(count, global = false) : Nil
Set prefetch limit to count messages, no more messages will be delivered to the consumer until one or more message have been acknowledged or rejected
-
#basic_recover(requeue : Bool) : Nil
Tell the broker to either deliver all unacknowledge messages again if requeue is false or rejecting all if requeue is true
-
#basic_reject(delivery_tag : UInt64, requeue = false) : Nil
Reject a message with delivery_tag, optionally requeue it
-
#close(reason = "", code = 200) : Nil
Close the channel The reason might be logged by the server
- #closed? : Bool
-
#confirm_select(no_wait = false) : Nil
Sets the channel in publish confirm mode, each published message will be acked or nacked
- #default_exchange
- #direct_exchange(name = "amq.direct", passive = true)
-
#exchange(name, type, passive = false, durable = true, internal = false, auto_delete = false, args arguments = Arguments.new)
Convinence method for Exchange handling
-
#exchange_bind(source : String, destination : String, routing_key : String, no_wait = false, args arguments = Arguments.new) : Nil
Bind an exchange to another exchange
-
#exchange_declare(name : String, type : String, passive = false, durable = true, internal = false, auto_delete = false, no_wait = false, args arguments = Arguments.new) : Nil
Declares an exchange
-
#exchange_delete(name, if_unused = false, no_wait = false) : Nil
Deletes an exchange
-
#exchange_unbind(source : String, destination : String, routing_key : String, no_wait = false, args arguments = Arguments.new) : Nil
Unbind an exchange from another exchange
- #fanout_exchange(name = "amq.fanout", passive = true)
-
#flow(active : Bool)
Stop/start the flow of messages to consumers Not supported by all brokers
- #header_exchange(name = "amq.header", passive = true)
- #id : UInt16
-
#inspect(io : IO) : Nil
Appends a String representation of this object which includes its class name, its object address and the values of all instance variables.
-
#on_cancel(&blk : String -> Nil)
Callback that will be called if a consumer is cancelled by the server The argument to the callback is the consumer tag
-
#on_close(&blk : UInt16, String -> )
Callback that will be called if the channel is closed by the server
-
#on_return(&blk : ReturnedMessage -> Nil)
Callback that is called if a published message is returned by the server
-
#prefetch(count, global = false) : Nil
Alias for
#basic_qos
-
#queue(name : String, passive = false, durable = true, exclusive = false, auto_delete = false, args arguments = Arguments.new)
Declares a queue with a name, by default durable and not auto-deleted
-
#queue
Declares a temporary queue, which won't be durable and auto-deleted when not used anymore
-
#queue_bind(queue : String, exchange : String, routing_key : String, no_wait = false, args arguments = Arguments.new) : Nil
Bind a queue to an exchange, with a routing_key and optionally some arguments
-
#queue_declare(name : String, passive = false, durable = name.empty? ? false : true, exclusive = name.empty? ? true : false, auto_delete = name.empty? ? true : false, no_wait = false, args arguments = Arguments.new)
Declare a queue with name passive will raise if the queue doesn't already exists, other arguments are ignored durable will make the queue durable on the server (note that messages have have the persistent flag set to make the messages persistent) exclusive will make the queue exclusive to the channel and will be deleted when the channel is closed auto_delete will delete the queue when the last consumer as stopped consuming no_wait will cause the method to return without waiting for a confirmation from the server
-
#queue_delete(name : String, if_unused = false, if_empty = false, no_wait = false)
Delete a queue
-
#queue_purge(name : String, no_wait = false)
Purge/empty a queue, will return the number of messages deleted
-
#queue_unbind(queue : String, exchange : String, routing_key : String, args arguments = Arguments.new) : Nil
Unbind a queue from an exchange, with a routing_key and optionally some arguments
- #topic_exchange(name = "amq.topic", passive = true)
-
#transaction(&)
Commits a transaction if the block returns, rolls back the transaction if the block raises an exception
-
#tx_commit : Nil
Commit a transaction
-
#tx_rollback : Nil
Rollback a transaction
-
#tx_select : Nil
Set the Channel in transaction mode
-
#wait_for_confirms : Bool
Returns when there are no unconfirmed publishes on the channel Raises if there was any negative acknowledgements
Instance Method Detail
Acknowledge a message with delivery_tag, or all message up and including delivery_tag if multiple set set
Cancel the consumer with the consumer_tag Even with no_wait = false the method will return immediately, but outstanding deliveries will be processed.
Consume messages from a queue
- Make sure to eventually ack or reject each message if no_ack is false
- The exclusive flags ensures that only a single consumer receives messages from the queue at the time
- The method will block if the flag is set, until the consumer/channel/connection is closed or the callback raises an exception
- To let multiple fibers process messages increase work_pool Make sure to handle all exceptions in the consume block, as unhandeled exceptions will cause the channel to be closed (to prevent dangling unacked messages and exception floods).
Get a single message from a queue The message must eventually be acked or rejected if no_ack is false
Reject a message with delivery_tag, optionally requeue it Reject all message up to and including delivery_tag if multiple is true
Publish a bytes message, to an exchange with routing_key
Publish a string message, to an exchange with routing_key
Publish an io message, to an exchange with routing_key Only data from the current position of the IO to the end will be published. The position will be restored after publish.
Publish a message with a set bytesize, to an exchange with routing_key
Set prefetch limit to count messages, no more messages will be delivered to the consumer until one or more message have been acknowledged or rejected
Tell the broker to either deliver all unacknowledge messages again if requeue is false or rejecting all if requeue is true
Unacknowledged messages retrived by #basic_get
are requeued regardless.
Reject a message with delivery_tag, optionally requeue it
Close the channel The reason might be logged by the server
Sets the channel in publish confirm mode, each published message will be acked or nacked
Convinence method for Exchange handling
Bind an exchange to another exchange
Declares an exchange
Deletes an exchange
Unbind an exchange from another exchange
Appends a String representation of this object which includes its class name, its object address and the values of all instance variables.
class Person
def initialize(@name : String, @age : Int32)
end
end
Person.new("John", 32).inspect # => #<Person:0x10fd31f20 @name="John", @age=32>
Callback that will be called if a consumer is cancelled by the server The argument to the callback is the consumer tag
Callback that will be called if the channel is closed by the server
Callback that is called if a published message is returned by the server
Declares a queue with a name, by default durable and not auto-deleted
Declares a temporary queue, which won't be durable and auto-deleted when not used anymore
Bind a queue to an exchange, with a routing_key and optionally some arguments
Declare a queue with name passive will raise if the queue doesn't already exists, other arguments are ignored durable will make the queue durable on the server (note that messages have have the persistent flag set to make the messages persistent) exclusive will make the queue exclusive to the channel and will be deleted when the channel is closed auto_delete will delete the queue when the last consumer as stopped consuming no_wait will cause the method to return without waiting for a confirmation from the server
Delete a queue
Purge/empty a queue, will return the number of messages deleted
Unbind a queue from an exchange, with a routing_key and optionally some arguments
Commits a transaction if the block returns, rolls back the transaction if the block raises an exception
Returns when there are no unconfirmed publishes on the channel Raises if there was any negative acknowledgements