class Ktistec::Topic

Overview

A pub/sub topic.

Debouncing

Topics support provider-side debouncing for high-frequency subjects. When a subject is configured for debouncing, notifications are batched and delivered after a fixed time window instead of immediately.

Configure debounce using a regex pattern matching subject names:

Ktistec::Topic.configure_debounce(/\/actors\/[^\/]+\/notifications$/, 1.second)

Debounce configuration is in src/controllers/streaming.cr.

Note: Subject names can change at runtime (see .rename_subject). For example, thread subjects change from object IRI to thread IRI. Design patterns to match the category of possible names for a subject.

Debounce behavior:

Defined in:

framework/topic.cr

Constant Summary

Log = ::Log.for(self)

Constructors

Class Method Summary

Instance Method Summary

Constructor Detail

def self.new #

[View source]

Class Method Detail

def self.configure_debounce(pattern : Regex, interval : Time::Span) #

Configures debounce for subjects matching pattern.

interval is the debounce window.


[View source]
def self.debounce_interval_for(subject : String) : Time::Span | Nil #

Returns the debounce interval for a subject, or nil.


[View source]
def self.rename_subject(before, after) #

Renames a subject across all topics.


[View source]
def self.reset! #

Resets the topic class state.

Clears all subscriptions. Clears all subjects.

This is useful when testing. It should not be used in any other context!


[View source]

Instance Method Detail

def <<(subject : String) #

Adds a subject.

Raises an exception if the topic is frozen.


[View source]
def finalize #

Removes subjects that no longer belong to any topic.

Note: this method should only be called during garbage collection.


[View source]
def notify_subscribers(value : String = "") #

Notifies subscribers about updates.

Passes an optional value to each subscriber.

Does not block.


[View source]
def subjects #

Returns the subjects.


[View source]
def subscribe(timeout : Time::Span | Nil = nil, &) #

Subscribes to updates about the topic.

Yields all queued values.

Does not return unless the supplied block raises Stop, raises an exception, or the channel is closed.

A timeout may be specified to ensure the block is called periodically.


[View source]
def subscriptions #

Returns the subscriptions to this topic.


[View source]