class
JoobQ::RedisStore
- JoobQ::RedisStore
- JoobQ::Store
- Reference
- Object
Defined in:
joobq/redis_store.crConstant Summary
-
DELAYED_SET =
"joobq:delayed_jobs"
Constructors
- .instance : RedisStore
- .new(host : String = ENV.fetch("REDIS_HOST", "localhost"), port : Int32 = (ENV.fetch("REDIS_PORT", "6379")).to_i, password : String | Nil = ENV["REDIS_PASS"]?, pool_size : Int32 = (ENV.fetch("REDIS_POOL_SIZE", "500")).to_i, pool_timeout : Float64 = (ENV.fetch("REDIS_POOL_TIMEOUT", "2.0")).to_f64)
Instance Method Summary
-
#claim_job(queue_name : String, worker_id : String, klass : Class) : String | Nil
Implement abstract method - use dequeue with BRPOPLPUSH
-
#claim_jobs_batch(queue_name : String, worker_id : String, klass : Class, batch_size : Int32 = 5) : Array(String)
Implement abstract method - use batch dequeue
-
#cleanup_job(job_json : String, queue_name : String) : Nil
Simplified job cleanup for BRPOPLPUSH pattern - just remove from processing queue
-
#cleanup_jobs_batch(job_jsons : Array(String), queue_name : String) : Nil
Batch job cleanup for high performance
- #clear_queue(queue_name : String) : Nil
-
#clear_queues_batch(queue_names : Array(String)) : Nil
Optimized method to clear multiple queues in a single pipeline
-
#collect_statistics_batch : Hash(String, Int64)
Optimized connection reuse for statistics collection
- #delete_job(job : String) : Nil
-
#dequeue(queue_name : String, klass : Class) : String | Nil
High-performance reliable queue using BRPOPLPUSH
-
#dequeue_batch(queue_name : String, klass : Class, batch_size : Int32 = 10) : Array(String)
Batch dequeue for high performance - uses non-blocking operations
- #enqueue(job : Job) : String
- #enqueue_batch(jobs : Array(Job), batch_size : Int32 = 1000) : Nil
- #fetch_due_jobs(current_time = Time.local, delay_set : String = DELAYED_SET, limit : Int32 = 50, remove : Bool = true) : Array(String)
-
#find_jobs_batch(jids : Array(String)) : Hash(String, String | Nil)
Simplified batch job lookup - search in all known locations
-
#get_all_queue_metrics : Hash(String, QueueMetrics)
Get metrics for all configured queues using pipelining
-
#get_multiple_state_counts(states : Array(String)) : Hash(String, Int32)
Optimized method to get job counts for multiple states at once
-
#get_processing_jobs_count : Int32
Optimized method to get processing jobs count using Lua script
-
#get_queue_metrics(queue_name : String) : QueueMetrics
Get metrics for a single queue
-
#get_queue_metrics_pipelined(queue_names : Array(String)) : Hash(String, QueueMetrics)
Get queue metrics for multiple queues using pipelining
-
#get_retrying_jobs_count : Int32
Optimized retrying jobs count using Lua script for better performance
-
#get_retrying_jobs_paginated(page : Int32, per_page : Int32) : Array(String)
Optimized retrying jobs pagination using Lua script
- #health : RedisHealth
-
#health_check : Hash(String, String | Int32 | Bool | Float64)
Connection pool health check with detailed metrics
- #list_jobs(queue_name : String, page_number : Int32 = 1, page_size : Int32 = 200) : Array(String)
- #list_sorted_set_jobs(set_name : String, page_number : Int32 = 1, page_size : Int32 = 200) : Array(String)
- #mark_as_dead(job : Job, expiration_time : Int64) : Nil
-
#mark_job_completed(job_json : String, queue_name : String) : Nil
Mark job as completed with statistics
- #metrics : RedisMetrics
- #move_job_back_to_queue(queue_name : String) : Bool
-
#move_to_dead_letter(job : Job, queue_name : String) : Nil
High-performance move to dead letter queue using pipelined operations
-
#move_to_retry(job : Job, queue_name : String, delay_ms : Int64) : Bool
High-performance move to retry queue using pipelined operations
- #pipeline : RedisPipeline
- #pool_size : Int32
- #pool_timeout : Float64
-
#process_due_delayed_jobs(queue_name : String) : Array(String)
Process due jobs from delayed queue and move them back to main queue Jobs are moved back with "enqueued" status so workers can pick them up Returns the array of job JSON strings that were processed (including those from other queues)
- #processing_list(pattern : String = "#{PROCESSING_QUEUE}:*", limit : Int32 = 100) : Array(String)
-
#processing_list_paginated(offset : Int32, limit : Int32, pattern : String = "#{PROCESSING_QUEUE}:*") : Array(String)
Optimized processing jobs list with batch operations
-
#processing_queue_size(queue_name : String) : Int64
Get count of jobs currently in processing queue
- #queue_size(queue_name : String) : Int64
-
#queue_sizes_batch(queue_names : Array(String)) : Hash(String, Int64)
Optimized batch queue sizes to reduce connection overhead
- #redis : Redis::PooledClient
-
#release_job_claim(queue_name : String, worker_id : String) : Nil
Implement abstract method - no-op for BRPOPLPUSH pattern
-
#release_job_claims_batch(queue_name : String, worker_id : String, job_count : Int32) : Nil
Implement abstract method - no-op for BRPOPLPUSH pattern
- #reset : Nil
- #schedule(job : Job, delay_in_ms : Int64, delay_set : String = DELAYED_SET) : Nil
-
#schedule_delayed_retry(job : Job, queue_name : String, delay_ms : Int64) : Bool
Schedule delayed retry - simplified version of move_to_retry
- #schedule_job(job : String, schedule_time : Int64) : Nil
- #set_size(set_name : String) : Int64
-
#set_sizes_batch(set_names : Array(String)) : Hash(String, Int64)
Optimized batch set sizes to reduce connection overhead
-
#verify_job_removed_from_processing?(job_id : String, queue_name : String) : Bool
Verify that a job has been properly removed from processing queue
-
#verify_job_uniqueness(job_jid : String, queue_name : String) : Hash(String, Int32)
Verify that a job exists in only one location (for debugging/testing) Returns a hash with the job's locations
Instance methods inherited from class JoobQ::Store
claim_job(queue_name : String, worker_id : String, klass : Class) : String | Nil
claim_job,
claim_jobs_batch(queue_name : String, worker_id : String, klass : Class, batch_size : Int32) : Array(String)
claim_jobs_batch,
cleanup_job(job_json : String, queue_name : String) : Nil
cleanup_job,
cleanup_jobs_batch(job_jsons : Array(String), queue_name : String) : Nil
cleanup_jobs_batch,
clear_queue(queue_name : String) : Nil
clear_queue,
delete_job(job : String) : Nil
delete_job,
dequeue(queue_name : String, klass : Class) : String | Nil
dequeue,
enqueue(job : JoobQ::Job) : String
enqueue,
enqueue_batch(jobs : Array(JoobQ::Job), batch_size : Int32) : Nil
enqueue_batch,
fetch_due_jobs(current_time : Time, delay_set : String, limit : Int32, remove : Bool) : Array(String)
fetch_due_jobs,
list_jobs(queue_name : String, page_number : Int32 = 1, page_size : Int32 = 200) : Array(String)
list_jobs,
mark_as_dead(job : JoobQ::Job, expiration_time : Int64) : Nil
mark_as_dead,
mark_job_completed(job_json : String, queue_name : String) : Nil
mark_job_completed,
move_job_back_to_queue(queue_name : String) : Bool
move_job_back_to_queue,
processing_queue_size(queue_name : String) : Int64
processing_queue_size,
queue_size(queue_name : String) : Int64
queue_size,
release_job_claim(queue_name : String, worker_id : String) : Nil
release_job_claim,
release_job_claims_batch(queue_name : String, worker_id : String, job_count : Int32) : Nil
release_job_claims_batch,
schedule(job : JoobQ::Job, delay_in_ms : Int64, delay_set : String) : Nil
schedule,
schedule_job(job : String, schedule_time : Int64) : Nil
schedule_job,
verify_job_removed_from_processing?(job_id : String, queue_name : String) : Bool
verify_job_removed_from_processing?
Constructor Detail
Instance Method Detail
Implement abstract method - use dequeue with BRPOPLPUSH
Implement abstract method - use batch dequeue
Simplified job cleanup for BRPOPLPUSH pattern - just remove from processing queue
Batch job cleanup for high performance
Optimized method to clear multiple queues in a single pipeline
Optimized connection reuse for statistics collection
High-performance reliable queue using BRPOPLPUSH
Batch dequeue for high performance - uses non-blocking operations
Simplified batch job lookup - search in all known locations
Get metrics for all configured queues using pipelining
Optimized method to get job counts for multiple states at once
Optimized method to get processing jobs count using Lua script
Get queue metrics for multiple queues using pipelining
Optimized retrying jobs count using Lua script for better performance
Optimized retrying jobs pagination using Lua script
Connection pool health check with detailed metrics
Mark job as completed with statistics
High-performance move to dead letter queue using pipelined operations
High-performance move to retry queue using pipelined operations
Process due jobs from delayed queue and move them back to main queue Jobs are moved back with "enqueued" status so workers can pick them up Returns the array of job JSON strings that were processed (including those from other queues)
Optimized processing jobs list with batch operations
Get count of jobs currently in processing queue
Optimized batch queue sizes to reduce connection overhead
Implement abstract method - no-op for BRPOPLPUSH pattern
Implement abstract method - no-op for BRPOPLPUSH pattern
Schedule delayed retry - simplified version of move_to_retry
Optimized batch set sizes to reduce connection overhead
Verify that a job has been properly removed from processing queue
Verify that a job exists in only one location (for debugging/testing) Returns a hash with the job's locations