class NATS::Client

Overview

Instantiating a NATS::Client makes a connection to one of the given NATS servers.

Defined in:

jetstream.cr
kv.cr
nats.cr
objects.cr

Constant Summary

BUFFER_SIZE = 1 << 15
MAX_PUBLISH_SIZE = 1 * MEGABYTE
MEGABYTE = 1 << 20

Constructors

Instance Method Summary

Constructor Detail

def self.new(uri : URI, ping_interval = 2.minutes, max_pings_out = 2, nkeys_file : String | Nil = nil, user_credentials : String | Nil = nil) #

Connect to a single NATS server at the given URI

nats = NATS::Client.new(URI.parse("nats://nats.example.com"))

[View source]
def self.new(servers : Array(URI), ping_interval : Time::Span = 2.minutes, max_pings_out : Int32 = 2, nkeys_file : String | Nil = nil, user_credentials : String | Nil = nil) #

Connect to a NATS cluster at the given URIs

nats = NATS::Client.new([
  URI.parse("nats://nats-1.example.com"),
  URI.parse("nats://nats-2.example.com"),
  URI.parse("nats://nats-3.example.com"),
])

[View source]
def self.new(*, ping_interval = 2.minutes, max_pings_out = 2, nkeys_file : String | Nil = nil, user_credentials : String | Nil = nil) #

[View source]

Instance Method Detail

def close #

Close this NATS connection. This should be done explicitly before exiting the program so that the NATS server can remove any subscriptions that were associated with this client.


[View source]
def drain #

[View source]
def flush(timeout = 2.seconds) #

Flush the client's output buffer over the wire


[View source]
def flush! #

[View source]
def jetstream #

Returns a NATS::JetStream::Client that uses this client's connection to the NATS server.


[View source]
def kv #

Returns a NATS::KV::Client that uses this client's connection to the NATS server.

EXPERIMENTAL NATS KV support is experimental and subject to change as NATS support for it changes


[View source]
def nuid : NATS::NUID #

[View source]
def objects : Objects::Client #

[View source]
def on_disconnect(&on_disconnect) #

Execute the given block whenever this client is disconnected from the NATS server.

nats = NATS::Client.new
nats.on_disconnect { Datadog.metrics.increment "nats.disconnect" }

[View source]
def on_error(&on_error : Exception -> Nil) #

Execute the given block whenever an exception is raised inside this NATS client.

nats = NATS::Client.new
nats.on_error { |error| Honeybadger.notify error }

[View source]
def on_reconnect(&on_reconnect) #

Execute the given block whenever this client is reconnected to the NATS server.

nats = NATS::Client.new
nats.on_reconnect { Datadog.metrics.increment "nats.reconnect" }

[View source]
def ping(channel = Channel(Nil).new(1)) #

[View source]
def publish(subject : String, message : Data = Bytes.empty, reply_to : String | Nil = nil, headers : Message::Headers | Nil = nil) : Nil #

Publish the given message body (either Bytes for binary data or String for text) on the given NATS subject, optionally supplying a reply_to subject (if expecting a reply or to notify the receiver where to send updates) and any headers.

# Send an empty message to a subject
nats.publish "hello"

# Serialize an object to a subject
nats.publish "orders.#{order.id}", order.to_json

# Tell a recipient where to send results. For example, to stream results
# to a given subject:
reply_subject = "replies.orders.list.customer.123"
orders = [] of Order
nats.subscribe reply_subject do |msg|
  case result = (Order | Complete).from_json(String.new(msg.body))
  in Order
    orders << result
  in Complete
    nats.unsubscribe reply_subject
  end
end
nats.publish "orders.list.customer.123", reply_to: reply_subject

# Publish a message to NATS JetStream with a message-deduplication header
# for idempotency:
nats.jetstream.subscribe consumer_subject, queue_group: "my-service" do |msg|
  # ...
end
nats.publish orders_subject, order.to_json, headers: NATS::Message::Headers{
  # Deduplicate using the equivalent of a cache key
  "Nats-Msg-Id" => "order-submitted-#{order.id}-#{order.updated_at.to_json}",
}

[View source]
def reply(msg : Message, body : Data = "", headers : Headers | Nil = nil, *, flush = true) : Nil #

Send the given body to the msg's reply_to subject, often used in a request/reply messaging model.

nats.subscribe "orders.*", queue_group: "orders-service" do |msg|
  _, id = msg.subject.split('.') # Similar to HTTP path routing

  if order = OrderQuery.new.find_by(id: id)
    nats.reply msg, {order: order}.to_json
  else
    nats.reply msg, {error: "No order with that id found"}.to_json
  end
end

[View source]
def request(subject : String, message : Data = "", timeout : Time::Span = 2.seconds, headers : Headers | Nil = nil, *, flush = true) : Message | Nil #

Make a synchronous request to subscribers of the given subject, waiting up to timeout for a response from any of the subscribers. The first message to come back will be returned. If no messages comes back before the timeout elapses, nil is returned.

if order_response = nats.request("orders.info.#{order_id}")
  response << Order.from_json(String.new(order_response.body))
else
  response.status = :service_unavailable
end

[View source]
def request(subject : String, message : Data = "", timeout = 2.seconds, &block : Message -> ) : Nil #

Make an asynchronous request to subscribers of the given subject, not waiting for a response. The first message to come back will be passed to the block.


[View source]
def server_info : ServerInfo #

[View source]
def state : State #

The current state of the connection


[View source]
def subscribe(subject : String, queue_group : String | Nil = nil, sid = @current_sid.add(1), max_in_flight = 64000, &block : Message, Subscription -> ) : Subscription #

Subscribe to the given subject, optionally with a queue_group (so that each message is delivered to this application once instead of once for each instance of the application), executing the given block for each message.

require "nats"

nats = NATS::Client.new
nats.subscribe "orders.created" do |msg|
  order = Order.from_json(String.new(msg.body))

  # ...
end

[View source]
def unsubscribe(subscription : Subscription, max_messages : Int) : Nil #

Unsubscribe from the given subscription after the specified number of messages has been received.


[View source]
def unsubscribe(subscription : Subscription) : Nil #

Unsubscribe from the given subscription

nats = NATS::Client.new

new_orders = [] of NATS::Message
subscription = nats.subscribe "orders.created.*" do |msg|
  messages << msg
end

spawn do
  sleep 10.seconds
  nats.unsubscribe subscription
end

[View source]