struct NATS::KV::Bucket

Defined in:

kv.cr

Instance Method Summary

Instance Method Detail

def []=(key : String, value : Data) #

Set the value of a key


[View source]
def []?(key : String) : Bytes | Nil #

Get the value of a key, if it exists (not counting Delete operations), stripping away all metadata to return only the value. If you need metadata such as revision, timestamp, or operation, or if you need to be able to get deleted keys, you should use Bucket#get instead.


[View source]
def create(key : String, value : String) #

Creates the given key with the given value if and only if the key does not yet exist.


[View source]
def created : Time #

[View source]
def delete(key : String) #

Deletes the given key from the KV store. Inside the NATS server, this is implemented as adding another message to the stream that signifies deletion.


[View source]
def description : String | Nil #

An optional description of the purpose of this bucket


[View source]
def each_key(&) #

[View source]
def get(key : String, *, revision : Int | Nil = nil, ignore_deletes = false) : Entry | Nil #

Get the entry for a key as a KV::Entry - returns nil if the key does not exist or if it's been deleted with ignore_deletes set to true.

Important: If you do not set ignore_deletes, you may get a deleted key. This is because the keys are stored in a stream and deleting the key sets an operation flag (implemented in the NATS server as a KV-Operation message header) and this method retrieves the last entry in the stream for this key. ignore_deletes simply tells the client to ignore deleted messages.


[View source]
def get!(key : String, *, revision : Int | Nil = nil, ignore_deletes = false) : Entry #

Get the value of a key as a KV::Entry - raises KeyError if the key does not exist or if it's been deleted with ignore_deletes set to true.


[View source]
def history(key : String) #

Get the history


[View source]
def history : Int64 | Nil #

The number of revisions NATS will retain for this key


[View source]
def keys(pattern : String = ">") #

List all known keys for this bucket, returned as a Set(String).


[View source]
def last_update : Time #

[View source]
def max_bytes : Int64 | Nil #

The maximum size in bytes of this bucket in memory or on disk


[View source]
def max_value_size : Int32 | Nil #

The maximum number of bytes in a value


[View source]
def name : String #

The name of this bucket


[View source]
def placement : JetStream::StreamConfig::Placement | Nil #

[View source]
def purge(key : String) #

Purges the given key from the KV store. Inside the NATS server, this is implemented as rolling up all versions of this key into a single message with its KV::Entry#operation value (KV-Operation header) set to KV::Entry::Operation::Purge.


[View source]
def put(key : String, value : Data) #

Set the value of a key


[View source]
def replicas : Int32 #

The number of NATS nodes to which this KV bucket will be replicated


[View source]
def set(key : String, value : Data) #

Assign value to key in bucket asynchronously, not waiting for acknowledgement from the NATS server.

WARNING Without acknowledgement, there is no guarantee the server received the value. Use with caution.


[View source]
def size : Size #

[View source]

Where NATS stores the data for this bucket


[View source]
def stream_name : String #

The name of the underlying JetStream stream.


[View source]
def ttl : Time::Span | Nil #

The maximum length of time NATS will retain a value or revision for a key


[View source]
def update(key : String, value : String, revision : Int64) #

Updates the given key with the given value if and only if it exists and is currently at the given revision. If you do not have the latest revision, this method returns nil so you can perform domain-specific conflict resolution. If you need to set the key regardless of revision, use Bucket#put instead.


[View source]
def values : Int64 #

[View source]
def watch(key : String, *, ignore_deletes = false, include_history = true) #

Watch the given key (or wildcard) for changes and yielding them to the block. By default, this will also yield deleted messages. To avoid that, pass ignore_deletes: true.

bucket.watch("session.*") do |entry|
  _prefix, session_id = entry.subject.split('.', 2)
  if entry.operation.deleted?
    # do deleted things
  else
    # the session was updated
  end
end

This method blocks until the yielded Watch is stopped (use watch.stop), so if you want to run it in the background, you will need to run this method inside a spawn block.

You can also use this method to wait for a specific key to change once:

watch = bucket.watch(my_key)
watch.each do |entry|
  # react to the key change

  watch.stop if entry.latest? # exit the block
end

[View source]