class NATS::JetStream::Client
- NATS::JetStream::Client
- Reference
- Object
Overview
This class provides a client for NATS JetStream for at-least-once delivery.
You can either instantiate it with a NATS client or with the
NATS::Client#jetstream method as a shortcut.
Defined in:
jetstream.crConstructors
Instance Method Summary
-
#ack(msg : Message)
Acknowledge success processing the specified message, usually called at the end of your subscription block.
-
#consumer
Returns an
API::Consumerinstance to interact with the NATS JetStream API for consumers. -
#nack(msg : Message)
Negatively acknowledge the processing of a message, typically called when an exception is raised while processing.
- #publish(subject : String, body : Data, timeout : Time::Span = 2.seconds, headers : Headers = Headers.new, message_id : String | Nil = nil, expected_last_message_id : String | Nil = nil, expected_last_sequence : Int64 | Nil = nil, expected_stream : String | Nil = nil, expected_last_subject_sequence : Int64 | Nil = nil)
-
#pull_subscribe(consumer : API::V1::Consumer, backlog : Int = 64)
EXPERIMENTAL NATS JetStream pull subscriptions may be unstable
-
#stream
Returns an
API::Streaminstance to interact with the NATS JetStream API for streams. -
#subscribe(subject : String, queue_group : String | Nil = nil, &block : Message -> )
Subscribe to the given subject with an optional queue group.
-
#subscribe(consumer : JetStream::API::V1::Consumer, &block : Message -> )
Subscribe to messages delivered to the given consumer.
Constructor Detail
Instance Method Detail
Acknowledge success processing the specified message, usually called at the end of your subscription block.
jetstream.subscribe consumer do |msg|
# ...
jetstream.ack msg
end
Returns an API::Consumer instance to interact with the NATS JetStream
API for consumers.
Negatively acknowledge the processing of a message, typically called when an exception is raised while processing.
jetstream.subscribe consumer do |msg|
# doing some work
jetstream.ack msg # Successfully processed
rescue ex
jetstream.nack msg # Processing was unsuccessful, try again.
end
You can also implement exponential backoff by pushing the nack into a fiber that sleeps for some time before:
jetstream.subscribe consumer do |msg|
# ...
rescue ex
# Very important to do this in a `spawn`. Do not block the `subscribe`
# handler for more than 1-2 seconds or the NATS server will see you as
# a slow client and terminate the connection.
spawn do
# Sleep at most for the amount of time that NATS will wait to redeliver
backoff = {
(2 ** msg.delivered_count).milliseconds,
# Cap backoff because NATS will redeliver it before this time anyway
consumer.config.ack_wait || 30.seconds,
}.min
sleep backoff
jetstream.nack msg
end
end
EXPERIMENTAL NATS JetStream pull subscriptions may be unstable
Subscribe to the given subject with an optional queue group. This is
effectively identical to NATS::Client#subscribe, but the message
yielded to the block is a NATS::JetStream::Message instead of
a NATS::Message.
js = nats.jetstream
js.subscribe "orders.*", queue_group: "fulfillment" do |msg|
# ...
js.ack msg
end
NOTE: If provided, the queue_group must be the same as a Consumer's deliver_group for NATS server 2.4.0 and above.
Subscribe to messages delivered to the given consumer. Note that this consumer must be a push-based consumer. Pull-based consumers do not allow subscriptions because you must explicitly request the next message.
js = nats.jetstream
consumer = js.consumer.info("orders", "fulfillment")
js.subscribe consumer do |msg|
# ...
js.ack msg
end