abstract class
JoobQ::Store
- JoobQ::Store
- Reference
- Object
Overview
The Store class in the JoobQ module defines a generic interface for job storage and retrieval. It
provides methods for managing job queues, including enqueuing, dequeuing, scheduling, and marking jobs as failed or
dead. This abstract class must be implemented by concrete storage backends.
Methods
#clear_queue
abstract def clear_queue(queue_name : String) : Nil
Clears all jobs from the specified queue.
#delete_job
abstract def delete_job(job : JoobQ::Job) : Nil
Deletes the specified job from the store.
#enqueue
abstract def enqueue(job : JoobQ::Job) : String
Enqueues the specified job and returns a unique job ID.
#dequeue
abstract def dequeue(queue_name : String, klass : Class) : Job?
Dequeues the next job from the specified queue and returns it.
#move_job_back_to_queue
abstract def move_job_back_to_queue(queue_name : String) : Bool
Moves a job back to the queue if it was being processed but not completed.
mark_as_failed
abstract def mark_as_failed(job : JoobQ::Job, error_details : Hash) : Nil
Marks the specified job as failed with the provided error details.
#mark_as_dead
abstract def mark_as_dead(job : JoobQ::Job, expiration_time : String) : Nil
Marks the specified job as dead with the provided expiration time.
#schedule
abstract def schedule(job : JoobQ::Job, delay_in_ms : Int64) : Nil
Schedules the specified job to be executed after the given delay in milliseconds.
#fetch_due_jobs
abstract def fetch_due_jobs(current_time : Time) : Array(String)
Fetches jobs that are due to be executed at the specified current time.
#queue_size
abstract def queue_size(queue_name : String) : Int64
Returns the size of the specified queue.
#list_jobs
abstract def list_jobs(queue_name : String, page_number : Int32 = 1, page_size : Int32 = 200) : Array(String)
Lists jobs in the specified queue with pagination support.
Usage
To use the Store class, you need to implement it in a concrete storage backend.
Workflow
-
Initialization:
- Implement the
Storeclass in a concrete storage backend. - Initialize the store instance.
- Implement the
-
Managing Jobs:
- Use
#enqueueto add jobs to the queue. - Use
#dequeueto fetch and remove the next job from the queue. - Use
#scheduleto delay job execution. - Use
mark_as_failedand#mark_as_deadto handle job failures and expirations. - Use
#clear_queueand#delete_jobto manage job cleanup.
- Use
-
Fetching Jobs:
- Use
#fetch_due_jobsto retrieve jobs that are due for execution. - Use
#queue_sizeand#list_jobsto monitor and list jobs in the queue.
- Use
Example
Here is a complete example demonstrating how to implement and use the Store class:
require "joobq"
# Define a job
struct ExampleJob
include JoobQ::Job
property x : Int32
def initialize(@x : Int32)
end
def perform
puts "Performing job with x = #{x}"
end
end
This example demonstrates how to implement an in-memory store, enqueue a job, and dequeue and perform the job.
Direct Known Subclasses
Defined in:
joobq/store.crInstance Method Summary
- #claim_job(queue_name : String, worker_id : String, klass : Class) : String | Nil
- #claim_jobs_batch(queue_name : String, worker_id : String, klass : Class, batch_size : Int32) : Array(String)
- #cleanup_job(job_json : String, queue_name : String) : Nil
- #cleanup_jobs_batch(job_jsons : Array(String), queue_name : String) : Nil
- #clear_queue(queue_name : String) : Nil
- #delete_job(job : String) : Nil
- #dequeue(queue_name : String, klass : Class) : String | Nil
- #enqueue(job : JoobQ::Job) : String
- #enqueue_batch(jobs : Array(JoobQ::Job), batch_size : Int32) : Nil
- #fetch_due_jobs(current_time : Time, delay_set : String, limit : Int32, remove : Bool) : Array(String)
- #list_jobs(queue_name : String, page_number : Int32 = 1, page_size : Int32 = 200) : Array(String)
- #mark_as_dead(job : JoobQ::Job, expiration_time : Int64) : Nil
- #mark_job_completed(job_json : String, queue_name : String) : Nil
- #move_job_back_to_queue(queue_name : String) : Bool
- #processing_queue_size(queue_name : String) : Int64
- #queue_size(queue_name : String) : Int64
- #release_job_claim(queue_name : String, worker_id : String) : Nil
- #release_job_claims_batch(queue_name : String, worker_id : String, job_count : Int32) : Nil
- #schedule(job : JoobQ::Job, delay_in_ms : Int64, delay_set : String) : Nil
- #schedule_job(job : String, schedule_time : Int64) : Nil
- #verify_job_removed_from_processing?(job_id : String, queue_name : String) : Bool