class Bus

Overview

A Bus sends messages to interested subscribers. Those subscribers can reply to a message. Those replies will be routed back to the original sender.

Defined in:

bus.cr
bus/confidence.cr
bus/evaluation.cr
bus/handler.cr
bus/message.cr
bus/pipeline.cr
bus/version.cr

Constant Summary

VERSION = "0.1.1"

Constructors

Instance Method Summary

Constructor Detail

def self.new #

[View source]

Instance Method Detail

def has_subscription?(key) #

[View source]
def message(body : Array(String) | String, origin : String | Nil = nil, tags : Array(String) = [] of String, parameters : Hash(String, String) = Hash(String, String).new, strategy : Message::Strategy = Message::Strategy::RandomWinner) #

Generate a message for this bus.


[View source]
def origin_tag #

Generate a random UUID that does not already exist in the subscriptions.


[View source]
def pipeline : Pipeline(Message) #

[View source]
def send(body : Array(String) | String, origin : String | Nil = nil, tags : Array(String) = [] of String, parameters : Hash(String, String) = Hash(String, String).new, strategy : Message::Strategy = Message::Strategy::RandomWinner) #

[View source]
def send(message : Message) #

Send a message to the subscribers


[View source]
def subscribe(tags = [] of String) #

Subscribe a new message consumer to the Bus


[View source]
def subscriptions : SplayTreeMap(String, Hash(Pipeline(Message), Bool)) #

As of Crystal 1.0.0, using Hashes here is faster in single threaded release mode, but it fails UGLY in multithreaded release mode. The SplayTreeMap implementation is currently slightly slower than the hash in single threaded, but it works just fine in multithreaded mode, so this implementation is going to stick with the SplayTreeMap for now.


[View source]
def unsubscribe(pipeline) #

Remove a message consumer from the Bus


[View source]