class NATS::JetStream::Client
- NATS::JetStream::Client
- Reference
- Object
Overview
This class provides a client for NATS JetStream for at-least-once delivery.
You can either instantiate it with a NATS client or with the
NATS::Client#jetstream
method as a shortcut.
Defined in:
jetstream/client.crConstructors
Instance Method Summary
-
#ack(msg : Message)
Acknowledge success processing the specified message, usually called at the end of your subscription block.
-
#ack_sync(msg : Message, timeout : Time::Span = 2.seconds)
Acknowledge the given message, waiting on the NATS server to acknowledge your acknowledgement so that you can be sure it was delivered to the server and not simply caught up in the output buffer.
- #ack_sync(messages : Enumerable(Message), timeout : Time::Span = 2.seconds)
-
#consumer
Returns an
API::Consumer
instance to interact with the NATS JetStream API for consumers. -
#in_progress(msg : Message)
Notify the NATS server that you need more time to process this message, usually used when a consumer requires that you acknowledge a message within a certain amount of time but a given message is taking longer than expected.
-
#nack(msg : Message)
Negatively acknowledge the processing of a message, typically called when an exception is raised while processing.
-
#nack(msg : Message, *, backoff : NAKBackoff, max : Time::Span = 1.day)
Deliver a negative acknowledgement for the given message and tell the NATS server to delay sending it based on the
NAKBackoff
pattern specified. -
#nack(msg : Message, *, delay : Time::Span)
Deliver a negative acknowledgement for the given message and tell the NATS server to delay sending it for the given time span.
- #publish(subject : String, body : Data, timeout : Time::Span = 2.seconds, headers : Headers = Headers.new, message_id : String | Nil = nil, expected_last_message_id : String | Nil = nil, expected_last_sequence : Int64 | Nil = nil, expected_stream : String | Nil = nil, expected_last_subject_sequence : Int64 | Nil = nil)
-
#pull_each(consumer : Consumer, *, batch_size : Int, &) : Nil
EXPERIMENTAL NATS JetStream pull subscriptions may be unstable
-
#pull_subscribe(consumer : Consumer, *, batch_size : Int, &) : Nil
EXPERIMENTAL NATS JetStream pull subscriptions may be unstable
-
#pull_subscribe(consumer : Consumer, backlog : Int = 64)
EXPERIMENTAL NATS JetStream pull subscriptions may be unstable
-
#stream
Returns an
API::Stream
instance to interact with the NATS JetStream API for streams. -
#subscribe(consumer : JetStream::Consumer, &block : Message -> )
Subscribe to messages delivered to the given consumer.
-
#subscribe(subject : String, queue_group : String | Nil = nil, &block : Message -> )
Subscribe to the given subject with an optional queue group.
Constructor Detail
Instance Method Detail
Acknowledge success processing the specified message, usually called at the end of your subscription block.
NOTE This method is asynchronous. If you need a guarantee that NATS has
received your acknowledgement, use #ack_sync
instead.
jetstream.subscribe consumer do |msg|
# ...
jetstream.ack msg
end
Acknowledge the given message, waiting on the NATS server to acknowledge your acknowledgement so that you can be sure it was delivered to the server and not simply caught up in the output buffer.
jetstream.subscribe consumer do |msg|
# ...
jetstream.ack_sync msg
# By the time you get here, the NATS server knows you've acknowledged.
end
Returns an API::Consumer
instance to interact with the NATS JetStream
API for consumers.
Notify the NATS server that you need more time to process this message, usually used when a consumer requires that you acknowledge a message within a certain amount of time but a given message is taking longer than expected.
jetstream.subscribe consumer do |msg|
start = Time.monotonic
users.each do |user|
# If we're running out of time, reset the timer.
if Time.monotonic - start > 25.seconds
jetstream.in_progress msg
end
process(msg)
end
end
Negatively acknowledge the processing of a message, typically called when an exception is raised while processing.
jetstream.subscribe consumer do |msg|
# doing some work
jetstream.ack msg # Successfully processed
rescue ex
jetstream.nack msg # Processing was unsuccessful, try again.
end
Deliver a negative acknowledgement for the given message and tell the
NATS server to delay sending it based on the NAKBackoff
pattern
specified.
jetstream.subscribe consumer do |msg|
# do some work
jetstream.ack msg
rescue
# Use exponential backoff of up to an hour for retries
jetstream.nack msg, backoff: :exponential, max: 1.hour
end
Deliver a negative acknowledgement for the given message and tell the NATS server to delay sending it for the given time span.
jetstream.subscribe consumer do |msg|
# do some work
jetstream.ack msg
rescue
# Use exponential backoff of up to an hour for retries
jetstream.nack msg, delay: 30.seconds
end
EXPERIMENTAL NATS JetStream pull subscriptions may be unstable
EXPERIMENTAL NATS JetStream pull subscriptions may be unstable
EXPERIMENTAL NATS JetStream pull subscriptions may be unstable
Subscribe to messages delivered to the given consumer. Note that this consumer must be a push-based consumer. Pull-based consumers do not allow subscriptions because you must explicitly request the next message.
js = nats.jetstream
consumer = js.consumer.info("orders", "fulfillment")
js.subscribe consumer do |msg|
# ...
js.ack msg
end
Subscribe to the given subject with an optional queue group. This is
effectively identical to NATS::Client#subscribe
, but the message
yielded to the block is a NATS::JetStream::Message
instead of
a NATS::Message
.
js = nats.jetstream
js.subscribe "orders.*", queue_group: "fulfillment" do |msg|
# ...
js.ack msg
end
NOTE: If provided, the queue_group
must be the same as a Consumer
's deliver_group
for NATS server 2.4.0 and above.