class Conveyor::Belt

Overview

The Conveyor::Belt processes your Job instances sequentially. Each job is fetched from Redis and placed on the belt to be processed. If the job does not complete successfully, it is removed from the belt and placed back in the queue, waiting an exponential amount of time before retrying.

Your application can have one or more Conveyor::Belts, allowing you to process multiple jobs concurrently. You can configure this by setting Configuration#concurrency:

Conveyor.configure do |config|
  # ...
  c.concurrency = 10
end

The Orchestrator will manage your Belt instances throughout their entire lifecycle, so you shouldn't need to deal with Conveyor::Belt directly in your application, but it's a good idea to know that it exists.

Defined in:

belt.cr

Constructors

Instance Method Summary

Constructor Detail

def self.new(*, redis : Redis::Client, queues : Array(String), presence_duration : Time::Span, timeout : Time::Span = 2.seconds, log : Log = Log.for("conveyor.belt"), max_attempts : Int32 = 25) #

[View source]

Instance Method Detail

def clear_queues! #

[View source]
def delete(id : String) : self #

[View source]
def fetch : JobData | Nil #

Retrieves JobData from Redis, if there are any pending jobs in any of the queues passed to this Belt's constructor. This method also marks the job in Redis as pending and assigned to this Belt.


[View source]
def id : String #

[View source]
def jobs_per_second #

[View source]
def on_error(&on_error : ::Exception -> Nil) #

Tell this belt what to do when an exception occurs while #fetching or #working on a Job.


[View source]
def reenqueue(job_data : JobData, max_attempts : Int32, exception : ::Exception | Nil) : self #

Reschedule the job to run after an amount of time based on the number of times the job has been attempted has passed.


[View source]
def run_one : Nil #

Fetch and perform a single job


[View source]
def running? : Bool #

[View source]
def start(&) #

Start up this belt to begin processing jobs from the queues that feed into it. This is called by the Orchestrator on start.


[View source]
def state : State #

[View source]
def stop #

Stop processing jobs on this belt. The currently processing job will finish as long as the process does not exit beforehand, but no new jobs will be processed on this belt.

The Orchestrator typically calls this method.


[View source]
def work(data : JobData) #

Deserializes the job payload provided by the given JobData and calls the job type's Job#call method. If an exception occurs while processing the job, the belt's #on_error block is invoked and the job is rescheduled to run on an exponential-backoff schedule based on how many times the job has been attempted.


[View source]