class NATS::JetStream::Client

Overview

This class provides a client for NATS JetStream for at-least-once delivery. You can either instantiate it with a NATS client or with the NATS::Client#jetstream method as a shortcut.

Defined in:

jetstream/client.cr

Constructors

Instance Method Summary

Constructor Detail

def self.new(nats : NATS::Client) #

[View source]

Instance Method Detail

def ack(msg : Message) #

Acknowledge success processing the specified message, usually called at the end of your subscription block.

NOTE This method is asynchronous. If you need a guarantee that NATS has received your acknowledgement, use #ack_sync instead.

jetstream.subscribe consumer do |msg|
  # ...

  jetstream.ack msg
end

[View source]
def ack_sync(msg : Message, timeout : Time::Span = 2.seconds) #

Acknowledge the given message, waiting on the NATS server to acknowledge your acknowledgement so that you can be sure it was delivered to the server and not simply caught up in the output buffer.

jetstream.subscribe consumer do |msg|
  # ...

  jetstream.ack_sync msg
  # By the time you get here, the NATS server knows you've acknowledged.
end

[View source]
def ack_sync(messages : Enumerable(Message), timeout : Time::Span = 2.seconds) #

[View source]
def consumer #

Returns an API::Consumer instance to interact with the NATS JetStream API for consumers.


[View source]
def in_progress(msg : Message) #

Notify the NATS server that you need more time to process this message, usually used when a consumer requires that you acknowledge a message within a certain amount of time but a given message is taking longer than expected.

jetstream.subscribe consumer do |msg|
  start = Time.monotonic
  users.each do |user|
    # If we're running out of time, reset the timer.
    if Time.monotonic - start > 25.seconds
      jetstream.in_progress msg
    end

    process(msg)
  end
end

[View source]
def nack(msg : Message) #

Negatively acknowledge the processing of a message, typically called when an exception is raised while processing.

jetstream.subscribe consumer do |msg|
  # doing some work

  jetstream.ack msg # Successfully processed

rescue ex
  jetstream.nack msg # Processing was unsuccessful, try again.
end

[View source]
def nack(msg : Message, *, backoff : NAKBackoff, max : Time::Span = 1.day) #

Deliver a negative acknowledgement for the given message and tell the NATS server to delay sending it based on the NAKBackoff pattern specified.

jetstream.subscribe consumer do |msg|
  # do some work
  jetstream.ack msg
rescue
  # Use exponential backoff of up to an hour for retries
  jetstream.nack msg, backoff: :exponential, max: 1.hour
end

[View source]
def nack(msg : Message, *, delay : Time::Span) #

Deliver a negative acknowledgement for the given message and tell the NATS server to delay sending it for the given time span.

jetstream.subscribe consumer do |msg|
  # do some work
  jetstream.ack msg
rescue
  # Use exponential backoff of up to an hour for retries
  jetstream.nack msg, delay: 30.seconds
end

[View source]
def publish(subject : String, body : Data, timeout : Time::Span = 2.seconds, headers : Headers = Headers.new, message_id : String | Nil = nil, expected_last_message_id : String | Nil = nil, expected_last_sequence : Int64 | Nil = nil, expected_stream : String | Nil = nil, expected_last_subject_sequence : Int64 | Nil = nil) #

[View source]
def pull_each(consumer : Consumer, *, batch_size : Int, &) : Nil #

EXPERIMENTAL NATS JetStream pull subscriptions may be unstable


[View source]
def pull_subscribe(consumer : Consumer, *, batch_size : Int, &) : Nil #

EXPERIMENTAL NATS JetStream pull subscriptions may be unstable


[View source]
def pull_subscribe(consumer : Consumer, backlog : Int = 64) #

EXPERIMENTAL NATS JetStream pull subscriptions may be unstable


[View source]
def stream #

Returns an API::Stream instance to interact with the NATS JetStream API for streams.


[View source]
def subscribe(consumer : JetStream::Consumer, &block : Message -> ) #

Subscribe to messages delivered to the given consumer. Note that this consumer must be a push-based consumer. Pull-based consumers do not allow subscriptions because you must explicitly request the next message.

js = nats.jetstream
consumer = js.consumer.info("orders", "fulfillment")
js.subscribe consumer do |msg|
  # ...

  js.ack msg
end

[View source]
def subscribe(subject : String, queue_group : String | Nil = nil, &block : Message -> ) #

Subscribe to the given subject with an optional queue group. This is effectively identical to NATS::Client#subscribe, but the message yielded to the block is a NATS::JetStream::Message instead of a NATS::Message.

js = nats.jetstream
js.subscribe "orders.*", queue_group: "fulfillment" do |msg|
  # ...

  js.ack msg
end

NOTE: If provided, the queue_group must be the same as a Consumer's deliver_group for NATS server 2.4.0 and above.


[View source]