class NATS::JetStream::Client

Overview

This class provides a client for NATS JetStream for at-least-once delivery. You can either instantiate it with a NATS client or with the NATS::Client#jetstream method as a shortcut.

Defined in:

jetstream.cr

Constructors

Instance Method Summary

Constructor Detail

def self.new(nats : NATS::Client) #

[View source]

Instance Method Detail

def ack(msg : Message) #

Acknowledge success processing the specified message, usually called at the end of your subscription block.

jetstream.subscribe consumer do |msg|
  # ...

  jetstream.ack msg
end

[View source]
def consumer #

Returns an API::Consumer instance to interact with the NATS JetStream API for consumers.


[View source]
def nack(msg : Message) #

Negatively acknowledge the processing of a message, typically called when an exception is raised while processing.

jetstream.subscribe consumer do |msg|
  # doing some work

  jetstream.ack msg # Successfully processed

rescue ex
  jetstream.nack msg # Processing was unsuccessful, try again.
end

You can also implement exponential backoff by pushing the nack into a fiber that sleeps for some time before:

jetstream.subscribe consumer do |msg|
  # ...

rescue ex
  # Very important to do this in a `spawn`. Do not block the `subscribe`
  # handler for more than 1-2 seconds or the NATS server will see you as
  # a slow client and terminate the connection.
  spawn do
    # Sleep at most for the amount of time that NATS will wait to redeliver
    backoff = {
      (2 ** msg.delivered_count).milliseconds,
      # Cap backoff because NATS will redeliver it before this time anyway
      consumer.config.ack_wait || 30.seconds,
    }.min
    sleep backoff
    jetstream.nack msg
  end
end

[View source]
def publish(subject : String, body : Data, timeout : Time::Span = 2.seconds, headers : Headers = Headers.new, message_id : String | Nil = nil, expected_last_message_id : String | Nil = nil, expected_last_sequence : Int64 | Nil = nil, expected_stream : String | Nil = nil, expected_last_subject_sequence : Int64 | Nil = nil) #

[View source]
def pull_subscribe(consumer : API::V1::Consumer, backlog : Int = 64) #

EXPERIMENTAL NATS JetStream pull subscriptions may be unstable


[View source]
def stream #

Returns an API::Stream instance to interact with the NATS JetStream API for streams.


[View source]
def subscribe(subject : String, queue_group : String | Nil = nil, &block : Message -> ) #

Subscribe to the given subject with an optional queue group. This is effectively identical to NATS::Client#subscribe, but the message yielded to the block is a NATS::JetStream::Message instead of a NATS::Message.

js = nats.jetstream
js.subscribe "orders.*", queue_group: "fulfillment" do |msg|
  # ...

  js.ack msg
end

NOTE: If provided, the queue_group must be the same as a Consumer's deliver_group for NATS server 2.4.0 and above.


[View source]
def subscribe(consumer : JetStream::API::V1::Consumer, &block : Message -> ) #

Subscribe to messages delivered to the given consumer. Note that this consumer must be a push-based consumer. Pull-based consumers do not allow subscriptions because you must explicitly request the next message.

js = nats.jetstream
consumer = js.consumer.info("orders", "fulfillment")
js.subscribe consumer do |msg|
  # ...

  js.ack msg
end

[View source]