class JoobQ::QueueFactory

Overview

Queue Factory for creating queues from string-based job class names

This factory solves the problem of instantiating generic Queue(T) classes when the job type T is only known as a string at runtime (from YAML config).

Basic Usage:

# 1. Register job types at compile-time (in your application code)
QueueFactory.register_job_type(EmailJob)
QueueFactory.register_job_type(ImageProcessingJob)

# 2a. Create queues individually using string names (from YAML)
queue = QueueFactory.create_queue(
  name: "email_queue",
  job_class_name: "EmailJob",
  workers: 5,
  throttle: {limit: 10, period: 1.minute}
)

# 2b. Or get all instantiated queues from configuration (memoized)
all_queues = QueueFactory.queues # Reads from JoobQ.config and caches

Memoization Pattern:

The .queues method integrates with Configure and YamlConfigLoader to:

The Crystal Limitation

Crystal is statically-typed and compiled. Generic types like Queue(T) must be instantiated at compile-time, not runtime. You cannot do:

# ❌ This doesn't work in Crystal
job_class_name = "EmailJob"  # from YAML
queue = Queue(job_class_name.constantize).new  # .constantize doesn't exist!

The Solution

Use a factory pattern where:

  1. All possible job types are registered at compile-time
  2. The factory uses a hash mapping string names to factory procs
  3. Each factory proc creates the correctly-typed queue
  4. We return BaseQueue interface to hide the generic type
  5. Queues are instantiated lazily and memoized from configuration

Defined in:

joobq/queue_factory.cr

Class Method Summary

Macro Summary

Class Method Detail

def self.add_to_registry(job_class_name : String, factory : FactoryProc, schema_proc : RegistryProc) #

Internal method to add a factory function to the registry


[View source]
def self.clear_queues_cache #

Clear only the queues cache (forces re-instantiation on next access)


[View source]
def self.clear_registry #

Clear the registry and queues cache (mainly for testing or config reload)


[View source]
def self.create_queue(name : String, job_class_name : String, workers : Int32, throttle : NamedTuple(limit: Int32, period: Time::Span) | Nil = nil) : BaseQueue #

Create a queue based on string job class name

Parameters:

  • name: Queue name
  • job_class_name: String name of the job class (e.g., "EmailJob")
  • workers: Number of workers for this queue
  • throttle: Optional throttling configuration

Returns: A BaseQueue instance

Raises: QueueFactoryError if the job class is not registered


[View source]
def self.create_queues_from_config(queue_configs : Hash(String, NamedTuple(job_class_name: String, workers: Int32, throttle: NamedTuple(limit: Int32, period: Time::Span) | Nil))) : Hash(String, BaseQueue) #

Create queues from YAML configuration data

This method takes the queue_configs hash from Configure and creates actual Queue instances for all configured queues.

Example:

config = YamlConfigLoader.load_from_file("config/joobq.yml")
queues = QueueFactory.create_queues_from_config(config.queue_configs)
queues.each do |name, queue|
  config.queues[name] = queue
end

[View source]
def self.populate_schema_registry(registry : JobSchemaRegistry) #

Populate a JobSchemaRegistry with all registered job types This is useful after config reset to repopulate the job_registry


[View source]
def self.queues : Hash(String, BaseQueue) #

Get all instantiated queues from configuration

This method instantiates and memoizes all queues based on the queue_configs defined in the Configure instance. Queues are created lazily on first access and cached for subsequent calls.

The method:

  1. Checks if queues are already cached
  2. If not, retrieves queue_configs from JoobQ.config
  3. Creates queue instances using the factory registry
  4. Caches and returns the queue hash indexed by queue name

Returns: Hash(String, BaseQueue) - queues indexed by queue name

Example:

# After registering job types and loading YAML config
QueueFactory.register_job_type(EmailJob)
QueueFactory.register_job_type(ImageProcessingJob)
config = Configure.load_from_yaml("config/joobq.yml")

# Get all instantiated queues
all_queues = QueueFactory.queues
email_queue = all_queues["email_queue"]?

[View source]
def self.registered?(job_class_name : String) : Bool #

Check if a job type is registered


[View source]
def self.registered_types : Array(String) #

Get list of registered job types


[View source]

Macro Detail

macro register_job_type(job_class) #

Register a job type for queue creation

This macro creates a factory function for the given job type and stores it in the registry using the job class name as the key.

Example:

QueueFactory.register_job_type(EmailJob)
QueueFactory.register_job_type(ImageProcessingJob)

[View source]