class JoobQ::Configure

Overview

Configure is responsible for managing the settings for the JoobQ job queue system.

Features

Usage Example

JoobQ::Configure.instance.queue "my_queue", 5, MyJob, {limit: 10, period: 1.minute}

Defined in:

joobq/configure.cr

Constructors

Class Method Summary

Macro Summary

Instance Method Summary

Constructor Detail

def self.load_from_cli_args(args : Array(String)) : Configure #

Load from CLI arguments


[View source]
def self.load_from_yaml(path : String | Nil = nil, env : String | Nil = nil) : Configure #

YAML configuration loading methods


[View source]
def self.load_from_yaml_sources(sources : Array(String)) : Configure #

Load from multiple YAML sources with merging


[View source]

Class Method Detail

def self.load_hybrid(yaml_path : String | Nil = nil, &) #

Hybrid configuration - YAML + programmatic


[View source]

Macro Detail

macro queue(name, workers, job, throttle = nil) #

Macro: Define a queue

Adds a queue configuration and optionally applies throttling limits.


[View source]
macro register_job(job_class) #

Helper macro to register a job type and add it to the factory

This combines job registry and queue factory registration in one call.

Example:

config = JoobQ.config
config.register_job(EmailJob)
config.register_job(ImageProcessingJob)

[View source]

Instance Method Detail

def create_queues_from_yaml_config #

Create queues from stored queue_configs using the QueueFactory

This method bridges YAML configuration with actual queue instantiation. It should be called after:

  1. YAML configuration is loaded
  2. Job classes are defined and available
  3. Job types are registered with QueueFactory

Example:

# Load YAML config
config = Configure.load_from_yaml("config/joobq.yml")

# Register job types (must be done after job classes are defined)
QueueFactory.register_job_type(EmailJob)
QueueFactory.register_job_type(ImageProcessingJob)

# Create queues from YAML configuration
config.create_queues_from_yaml_config

# Now queues are available
JoobQ.start

[View source]
def dead_letter_ttl : Time::Span #

[View source]
def dead_letter_ttl=(dead_letter_ttl : Time::Span) #

[View source]
def default_queue : String #

[View source]
def default_queue=(default_queue : String) #

[View source]
def delayed_job_scheduler : DelayedJobScheduler #

Delayed job scheduler (processes retrying jobs)


[View source]
def delayed_job_scheduler=(delayed_job_scheduler : DelayedJobScheduler) #

Delayed job scheduler (processes retrying jobs)


[View source]
def error_monitor : ErrorMonitor #

[View source]
def error_monitoring(&) #

Configure error monitoring


[View source]
def error_monitoring(alert_thresholds : Hash(String, Int32) | Nil = nil, time_window : Time::Span | Nil = nil, max_recent_errors : Int32 | Nil = nil, notify_alert : Proc(Hash(String, String), Nil) | Nil = nil) #

Configure error monitoring with parameters


[View source]
def expires : Time::Span #

[View source]
def expires=(expires : Time::Span) #

[View source]
def failed_ttl : Time::Span #

[View source]
def failed_ttl=(failed_ttl : Time::Span) #

[View source]
def job_registry : JobSchemaRegistry #

[View source]
def job_registry=(job_registry : JobSchemaRegistry) #

[View source]
def middleware_pipeline : MiddlewarePipeline #

[View source]
def middlewares : Array(Middleware) #

Middlewares and Pipeline


[View source]
def middlewares=(middlewares : Array(Middleware)) #

Middlewares and Pipeline


[View source]
def pipeline_batch_size : Int32 #

Pipeline optimization settings (always enabled)


[View source]
def pipeline_batch_size=(pipeline_batch_size : Int32) #

Pipeline optimization settings (always enabled)


[View source]
def pipeline_max_commands : Int32 #

[View source]
def pipeline_max_commands=(pipeline_max_commands : Int32) #

[View source]
def pipeline_timeout : Float64 #

[View source]
def pipeline_timeout=(pipeline_timeout : Float64) #

[View source]
def queue_configs : Hash(String, {job_class_name: String, workers: Int32, throttle: {limit: Int32, period: Time::Span}?}) #

[View source]
def queues : Hash(String, JoobQ::BaseQueue) #

Properties and Getters


[View source]
def rest_api_enabled=(rest_api_enabled : Bool) #

[View source]
def rest_api_enabled? : Bool #

[View source]
def retries : Int32 #

[View source]
def retries=(retries : Int32) #

[View source]
def scheduler(tz : Time::Location = self.time_location, &) #

Add a scheduler and execute within its context


[View source]
def scheduler_configs : Array(NamedTuple(timezone: String, cron_jobs: Array(NamedTuple(pattern: String, job: String, args: Hash(String, YAML::Any))), recurring_jobs: Array(NamedTuple(interval: Time::Span, job: String, args: Hash(String, YAML::Any))))) #

[View source]
def scheduler_configs=(scheduler_configs : Array(NamedTuple(timezone: String, cron_jobs: Array(NamedTuple(pattern: String, job: String, args: Hash(String, YAML::Any))), recurring_jobs: Array(NamedTuple(interval: Time::Span, job: String, args: Hash(String, YAML::Any)))))) #

[View source]
def schedulers : Array(Scheduler) #

Schedulers


[View source]
def schedulers=(schedulers : Array(Scheduler)) #

Schedulers


[View source]
def setup_schedulers_from_config #

Helper method to setup schedulers from YAML configuration

This method should be called after job classes are available and loaded. It processes the scheduler configurations stored during YAML loading and sets up cron jobs and recurring jobs with proper job class resolution.

Example usage:

config = JoobQ::Configure.load_from_yaml("config/joobq.yml")
# ... load job classes ...
config.setup_schedulers_from_config

[View source]
def stats_enabled=(stats_enabled : Bool) #

[View source]
def stats_enabled? : Bool #

[View source]
def store : Store #

[View source]
def store=(store : Store) #

[View source]
def time_location : Time::Location #

[View source]
def time_location=(tz : String = "America/New_York") : Time::Location #

Set the time location globally


[View source]
def timeout : Time::Span #

[View source]
def timeout=(timeout : Time::Span) #

[View source]
def use(&) #

DSL: Add custom middlewares


[View source]
def validate_configuration : Nil #

Validation methods


[View source]
def worker_batch_size : Int32 #

[View source]
def worker_batch_size=(worker_batch_size : Int32) #

[View source]