class NATS::Client
- NATS::Client
- Reference
- Object
Overview
Instantiating a NATS::Client
makes a connection to one of the given NATS
servers.
Defined in:
jetstream.crkv.cr
nats.cr
objects.cr
Constant Summary
-
BUFFER_SIZE =
1 << 15
-
MAX_PUBLISH_SIZE =
1 * MEGABYTE
-
MEGABYTE =
1 << 20
Constructors
-
.new(uri : URI, ping_interval = 2.minutes, max_pings_out = 2, nkeys_file : String | Nil = nil, user_credentials : String | Nil = nil)
Connect to a single NATS server at the given URI
-
.new(servers : Array(URI), ping_interval : Time::Span = 2.minutes, max_pings_out : Int32 = 2, nkeys_file : String | Nil = nil, user_credentials : String | Nil = nil)
Connect to a NATS cluster at the given URIs
- .new(*, ping_interval = 2.minutes, max_pings_out = 2, nkeys_file : String | Nil = nil, user_credentials : String | Nil = nil)
Instance Method Summary
-
#close
Close this NATS connection.
- #drain
-
#flush(timeout = 2.seconds)
Flush the client's output buffer over the wire
- #flush!
-
#jetstream
Returns a
NATS::JetStream::Client
that uses this client's connection to the NATS server. -
#kv
Returns a
NATS::KV::Client
that uses this client's connection to the NATS server.EXPERIMENTAL NATS KV support is experimental and subject to change as NATS support for it changes
- #nuid : NATS::NUID
- #objects : Objects::Client
-
#on_disconnect(&on_disconnect)
Execute the given block whenever this client is disconnected from the NATS server.
-
#on_error(&on_error : Exception -> Nil)
Execute the given block whenever an exception is raised inside this NATS client.
-
#on_reconnect(&on_reconnect)
Execute the given block whenever this client is reconnected to the NATS server.
- #ping(channel = Channel(Nil).new(1))
-
#publish(subject : String, message : Data = Bytes.empty, reply_to : String | Nil = nil, headers : Message::Headers | Nil = nil) : Nil
Publish the given message body (either
Bytes
for binary data orString
for text) on the given NATS subject, optionally supplying areply_to
subject (if expecting a reply or to notify the receiver where to send updates) and anyheaders
. -
#reply(msg : Message, body : Data = "", headers : Headers | Nil = nil, *, flush = true) : Nil
Send the given
body
to themsg
'sreply_to
subject, often used in a request/reply messaging model. -
#request(subject : String, message : Data = "", timeout : Time::Span = 2.seconds, headers : Headers | Nil = nil, *, flush = true) : Message | Nil
Make a synchronous request to subscribers of the given
subject
, waiting up totimeout
for a response from any of the subscribers. -
#request(subject : String, message : Data = "", timeout = 2.seconds, &block : Message -> ) : Nil
Make an asynchronous request to subscribers of the given
subject
, not waiting for a response. - #server_info : ServerInfo
-
#state : State
The current state of the connection
-
#subscribe(subject : String, queue_group : String | Nil = nil, sid = @current_sid.add(1), max_in_flight = 64000, &block : Message, Subscription -> ) : Subscription
Subscribe to the given
subject
, optionally with aqueue_group
(so that each message is delivered to this application once instead of once for each instance of the application), executing the given block for each message. -
#unsubscribe(subscription : Subscription, max_messages : Int) : Nil
Unsubscribe from the given subscription after the specified number of messages has been received.
-
#unsubscribe(subscription : Subscription) : Nil
Unsubscribe from the given subscription
Constructor Detail
Connect to a single NATS server at the given URI
nats = NATS::Client.new(URI.parse("nats://nats.example.com"))
Connect to a NATS cluster at the given URIs
nats = NATS::Client.new([
URI.parse("nats://nats-1.example.com"),
URI.parse("nats://nats-2.example.com"),
URI.parse("nats://nats-3.example.com"),
])
Instance Method Detail
Close this NATS connection. This should be done explicitly before exiting the program so that the NATS server can remove any subscriptions that were associated with this client.
Returns a NATS::JetStream::Client
that uses this client's connection to
the NATS server.
Returns a NATS::KV::Client
that uses this client's connection to
the NATS server.
EXPERIMENTAL NATS KV support is experimental and subject to change as NATS support for it changes
Execute the given block whenever this client is disconnected from the NATS server.
nats = NATS::Client.new
nats.on_disconnect { Datadog.metrics.increment "nats.disconnect" }
Execute the given block whenever an exception is raised inside this NATS client.
nats = NATS::Client.new
nats.on_error { |error| Honeybadger.notify error }
Execute the given block whenever this client is reconnected to the NATS server.
nats = NATS::Client.new
nats.on_reconnect { Datadog.metrics.increment "nats.reconnect" }
Publish the given message body (either Bytes
for binary data or String
for text) on the given NATS subject, optionally supplying a reply_to
subject (if expecting a reply or to notify the receiver where to send updates) and any headers
.
# Send an empty message to a subject
nats.publish "hello"
# Serialize an object to a subject
nats.publish "orders.#{order.id}", order.to_json
# Tell a recipient where to send results. For example, to stream results
# to a given subject:
reply_subject = "replies.orders.list.customer.123"
orders = [] of Order
nats.subscribe reply_subject do |msg|
case result = (Order | Complete).from_json(String.new(msg.body))
in Order
orders << result
in Complete
nats.unsubscribe reply_subject
end
end
nats.publish "orders.list.customer.123", reply_to: reply_subject
# Publish a message to NATS JetStream with a message-deduplication header
# for idempotency:
nats.jetstream.subscribe consumer_subject, queue_group: "my-service" do |msg|
# ...
end
nats.publish orders_subject, order.to_json, headers: NATS::Message::Headers{
# Deduplicate using the equivalent of a cache key
"Nats-Msg-Id" => "order-submitted-#{order.id}-#{order.updated_at.to_json}",
}
Send the given body
to the msg
's reply_to
subject, often used in a
request/reply messaging model.
nats.subscribe "orders.*", queue_group: "orders-service" do |msg|
_, id = msg.subject.split('.') # Similar to HTTP path routing
if order = OrderQuery.new.find_by(id: id)
nats.reply msg, {order: order}.to_json
else
nats.reply msg, {error: "No order with that id found"}.to_json
end
end
Make a synchronous request to subscribers of the given subject
, waiting
up to timeout
for a response from any of the subscribers. The first
message to come back will be returned. If no messages comes back before
the timeout
elapses, nil
is returned.
if order_response = nats.request("orders.info.#{order_id}")
response << Order.from_json(String.new(order_response.body))
else
response.status = :service_unavailable
end
Make an asynchronous request to subscribers of the given subject
, not
waiting for a response. The first message to come back will be passed to
the block.
Subscribe to the given subject
, optionally with a queue_group
(so that
each message is delivered to this application once instead of once for
each instance of the application), executing the given block for each
message.
require "nats"
nats = NATS::Client.new
nats.subscribe "orders.created" do |msg|
order = Order.from_json(String.new(msg.body))
# ...
end
Unsubscribe from the given subscription after the specified number of messages has been received.
Unsubscribe from the given subscription
nats = NATS::Client.new
new_orders = [] of NATS::Message
subscription = nats.subscribe "orders.created.*" do |msg|
messages << msg
end
spawn do
sleep 10.seconds
nats.unsubscribe subscription
end