class JoobQ::RedisStore

Defined in:

joobq/redis_store.cr

Constant Summary

DELAYED_SET = "joobq:delayed_jobs"

Constructors

Instance Method Summary

Instance methods inherited from class JoobQ::Store

claim_job(queue_name : String, worker_id : String, klass : Class) : String | Nil claim_job, claim_jobs_batch(queue_name : String, worker_id : String, klass : Class, batch_size : Int32) : Array(String) claim_jobs_batch, cleanup_job(job_json : String, queue_name : String) : Nil cleanup_job, cleanup_jobs_batch(job_jsons : Array(String), queue_name : String) : Nil cleanup_jobs_batch, clear_queue(queue_name : String) : Nil clear_queue, delete_job(job : String) : Nil delete_job, dequeue(queue_name : String, klass : Class) : String | Nil dequeue, enqueue(job : JoobQ::Job) : String enqueue, enqueue_batch(jobs : Array(JoobQ::Job), batch_size : Int32) : Nil enqueue_batch, fetch_due_jobs(current_time : Time, delay_set : String, limit : Int32, remove : Bool) : Array(String) fetch_due_jobs, list_jobs(queue_name : String, page_number : Int32 = 1, page_size : Int32 = 200) : Array(String) list_jobs, mark_as_dead(job : JoobQ::Job, expiration_time : Int64) : Nil mark_as_dead, mark_job_completed(job_json : String, queue_name : String) : Nil mark_job_completed, move_job_back_to_queue(queue_name : String) : Bool move_job_back_to_queue, processing_queue_size(queue_name : String) : Int64 processing_queue_size, queue_size(queue_name : String) : Int64 queue_size, release_job_claim(queue_name : String, worker_id : String) : Nil release_job_claim, release_job_claims_batch(queue_name : String, worker_id : String, job_count : Int32) : Nil release_job_claims_batch, schedule(job : JoobQ::Job, delay_in_ms : Int64, delay_set : String) : Nil schedule, schedule_job(job : String, schedule_time : Int64) : Nil schedule_job, verify_job_removed_from_processing?(job_id : String, queue_name : String) : Bool verify_job_removed_from_processing?

Constructor Detail

def self.instance : RedisStore #

[View source]
def self.new(host : String = ENV.fetch("REDIS_HOST", "localhost"), port : Int32 = (ENV.fetch("REDIS_PORT", "6379")).to_i, password : String | Nil = ENV["REDIS_PASS"]?, pool_size : Int32 = (ENV.fetch("REDIS_POOL_SIZE", "500")).to_i, pool_timeout : Float64 = (ENV.fetch("REDIS_POOL_TIMEOUT", "2.0")).to_f64) #

[View source]

Instance Method Detail

def claim_job(queue_name : String, worker_id : String, klass : Class) : String | Nil #

Implement abstract method - use dequeue with BRPOPLPUSH


[View source]
def claim_jobs_batch(queue_name : String, worker_id : String, klass : Class, batch_size : Int32 = 5) : Array(String) #

Implement abstract method - use batch dequeue


[View source]
def cleanup_job(job_json : String, queue_name : String) : Nil #

Simplified job cleanup for BRPOPLPUSH pattern - just remove from processing queue


[View source]
def cleanup_jobs_batch(job_jsons : Array(String), queue_name : String) : Nil #

Batch job cleanup for high performance


[View source]
def clear_queue(queue_name : String) : Nil #

[View source]
def clear_queues_batch(queue_names : Array(String)) : Nil #

Optimized method to clear multiple queues in a single pipeline


[View source]
def collect_statistics_batch : Hash(String, Int64) #

Optimized connection reuse for statistics collection


[View source]
def delete_job(job : String) : Nil #

[View source]
def dequeue(queue_name : String, klass : Class) : String | Nil #

High-performance reliable queue using BRPOPLPUSH


[View source]
def dequeue_batch(queue_name : String, klass : Class, batch_size : Int32 = 10) : Array(String) #

Batch dequeue for high performance - uses non-blocking operations


[View source]
def enqueue(job : Job) : String #

[View source]
def enqueue_batch(jobs : Array(Job), batch_size : Int32 = 1000) : Nil #

[View source]
def fetch_due_jobs(current_time = Time.local, delay_set : String = DELAYED_SET, limit : Int32 = 50, remove : Bool = true) : Array(String) #

[View source]
def find_jobs_batch(jids : Array(String)) : Hash(String, String | Nil) #

Simplified batch job lookup - search in all known locations


[View source]
def get_all_queue_metrics : Hash(String, QueueMetrics) #

Get metrics for all configured queues using pipelining


[View source]
def get_multiple_state_counts(states : Array(String)) : Hash(String, Int32) #

Optimized method to get job counts for multiple states at once


[View source]
def get_processing_jobs_count : Int32 #

Optimized method to get processing jobs count using Lua script


[View source]
def get_queue_metrics(queue_name : String) : QueueMetrics #

Get metrics for a single queue


[View source]
def get_queue_metrics_pipelined(queue_names : Array(String)) : Hash(String, QueueMetrics) #

Get queue metrics for multiple queues using pipelining


[View source]
def get_retrying_jobs_count : Int32 #

Optimized retrying jobs count using Lua script for better performance


[View source]
def get_retrying_jobs_paginated(page : Int32, per_page : Int32) : Array(String) #

Optimized retrying jobs pagination using Lua script


[View source]
def health : RedisHealth #

[View source]
def health_check : Hash(String, String | Int32 | Bool | Float64) #

Connection pool health check with detailed metrics


[View source]
def list_jobs(queue_name : String, page_number : Int32 = 1, page_size : Int32 = 200) : Array(String) #

[View source]
def list_sorted_set_jobs(set_name : String, page_number : Int32 = 1, page_size : Int32 = 200) : Array(String) #

[View source]
def mark_as_dead(job : Job, expiration_time : Int64) : Nil #

[View source]
def mark_job_completed(job_json : String, queue_name : String) : Nil #

Mark job as completed with statistics


[View source]
def metrics : RedisMetrics #

[View source]
def move_job_back_to_queue(queue_name : String) : Bool #

[View source]
def move_to_dead_letter(job : Job, queue_name : String) : Nil #

High-performance move to dead letter queue using pipelined operations


[View source]
def move_to_retry(job : Job, queue_name : String, delay_ms : Int64) : Bool #

High-performance move to retry queue using pipelined operations


[View source]
def pipeline : RedisPipeline #

[View source]
def pool_size : Int32 #

[View source]
def pool_timeout : Float64 #

[View source]
def process_due_delayed_jobs(queue_name : String) : Array(String) #

Process due jobs from delayed queue and move them back to main queue Jobs are moved back with "enqueued" status so workers can pick them up Returns the array of job JSON strings that were processed (including those from other queues)


[View source]
def processing_list(pattern : String = "#{PROCESSING_QUEUE}:*", limit : Int32 = 100) : Array(String) #

[View source]
def processing_list_paginated(offset : Int32, limit : Int32, pattern : String = "#{PROCESSING_QUEUE}:*") : Array(String) #

Optimized processing jobs list with batch operations


[View source]
def processing_queue_size(queue_name : String) : Int64 #

Get count of jobs currently in processing queue


[View source]
def queue_size(queue_name : String) : Int64 #

[View source]
def queue_sizes_batch(queue_names : Array(String)) : Hash(String, Int64) #

Optimized batch queue sizes to reduce connection overhead


[View source]
def redis : Redis::PooledClient #

[View source]
def release_job_claim(queue_name : String, worker_id : String) : Nil #

Implement abstract method - no-op for BRPOPLPUSH pattern


[View source]
def release_job_claims_batch(queue_name : String, worker_id : String, job_count : Int32) : Nil #

Implement abstract method - no-op for BRPOPLPUSH pattern


[View source]
def reset : Nil #

[View source]
def schedule(job : Job, delay_in_ms : Int64, delay_set : String = DELAYED_SET) : Nil #

[View source]
def schedule_delayed_retry(job : Job, queue_name : String, delay_ms : Int64) : Bool #

Schedule delayed retry - simplified version of move_to_retry


[View source]
def schedule_job(job : String, schedule_time : Int64) : Nil #

[View source]
def set_size(set_name : String) : Int64 #

[View source]
def set_sizes_batch(set_names : Array(String)) : Hash(String, Int64) #

Optimized batch set sizes to reduce connection overhead


[View source]
def verify_job_removed_from_processing?(job_id : String, queue_name : String) : Bool #

Verify that a job has been properly removed from processing queue


[View source]
def verify_job_uniqueness(job_jid : String, queue_name : String) : Hash(String, Int32) #

Verify that a job exists in only one location (for debugging/testing) Returns a hash with the job's locations


[View source]