From 21024d37f6d6d63ace3447f086fbdfbd758e23db Mon Sep 17 00:00:00 2001 From: Ben Fyvie Date: Fri, 14 Jan 2011 09:32:49 -0600 Subject: [PATCH 1/2] allowing a worker_name option to be passed in when creating a worker. This simplifies implementing workers that can restart jobs they were working on if the worker should happen to be killed or die. --- lib/delayed/worker.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 744e20ac8..9ef54da22 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -11,6 +11,7 @@ class Worker def initialize(options={}) @quiet = options[:quiet] + Delayed::Job.worker_name = options[:worker_name] if options.has_key?(:worker_name) Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority) Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority) end From d022b38d6dac33b476690d06572ff583d0b11b2f Mon Sep 17 00:00:00 2001 From: Ben Fyvie Date: Fri, 6 May 2011 10:49:45 -0500 Subject: [PATCH 2/2] Increment the "attempt" count of a job when a lock is retrieved (prior to invoking the job) and also check that the max_attempts is not exceeded prior to invoking the job. --- lib/delayed/job.rb | 56 +++++++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb index 2dfa0bba1..4860080c8 100644 --- a/lib/delayed/job.rb +++ b/lib/delayed/job.rb @@ -1,3 +1,5 @@ +require 'timeout' + module Delayed class DeserializationError < StandardError @@ -6,8 +8,11 @@ class DeserializationError < StandardError # A job object that is persisted to the database. # Contains the work object as a YAML field. class Job < ActiveRecord::Base - MAX_ATTEMPTS = 25 - MAX_RUN_TIME = 4.hours + @@max_attempts = 25 + @@max_run_time = 4.hours + + cattr_accessor :max_attempts, :max_run_time + set_table_name :delayed_jobs # By default failed jobs are destroyed after too many attempts. @@ -63,33 +68,40 @@ def payload_object=(object) # Reschedule the job in the future (when a job fails). # Uses an exponential scale depending on the number of failed attempts. def reschedule(message, backtrace = [], time = nil) - if self.attempts < MAX_ATTEMPTS + if max_attempts_exceeded? + max_attempts_exceeded + else time ||= Job.db_time_now + (attempts ** 4) + 5 - self.attempts += 1 self.run_at = time self.last_error = message + "\n" + backtrace.join("\n") self.unlock - save! - else - logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures." - destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now) + save! end end + def max_attempts_exceeded? + self.attempts > max_attempts + end + + def max_attempts_exceeded + logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures." + destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now) + end # Try to run one job. Returns true/false (work done/work failed) or nil if job can't be locked. def run_with_lock(max_run_time, worker_name) - logger.info "* [JOB] aquiring lock on #{name}" + logger.info "* [JOB] acquiring lock on #{name}" unless lock_exclusively!(max_run_time, worker_name) # We did not get the lock, some other worker process must have - logger.warn "* [JOB] failed to aquire exclusive lock for #{name}" + logger.warn "* [JOB] failed to acquire exclusive lock for #{name}" return nil # no work done end begin + raise "Attempted to run this job for a #{attempts} time which exceeds the max allowed of #{max_attempts}" if max_attempts_exceeded? runtime = Benchmark.realtime do - invoke_job # TODO: raise error if takes longer than max_run_time + Timeout.timeout(max_run_time.to_i) { invoke_job } destroy end # TODO: warn if runtime > max_run_time ? @@ -117,8 +129,7 @@ def self.enqueue(*args, &block) end # Find a few candidate jobs to run (in case some immediately get locked by others). - # Return in random order prevent everyone trying to do same head job at once. - def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) + def self.find_available(limit = 5, max_run_time = max_run_time) time_now = db_time_now @@ -138,16 +149,14 @@ def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) conditions.unshift(sql) - records = ActiveRecord::Base.silence do + ActiveRecord::Base.silence do find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit) end - - records.sort_by { rand() } end # Run the next job we can get an exclusive lock on. # If no jobs are left we return nil - def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME) + def self.reserve_and_run_one_job(max_run_time = max_run_time) # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next. # this leads to a more even distribution of jobs across the worker processes @@ -165,11 +174,11 @@ def lock_exclusively!(max_run_time, worker = worker_name) now = self.class.db_time_now affected_rows = if locked_by != worker # We don't own this job so we will update the locked_by name and the locked_at - self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)]) + self.class.update_all(["locked_at = ?, locked_by = ?, attempts = ?", now, worker, self.attempts += 1], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) else # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at - self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) + self.class.update_all(["locked_at = ?, attempts = ?", now, self.attempts += 1], ["id = ? and locked_by = ?", id, worker]) end if affected_rows == 1 self.locked_at = now @@ -190,6 +199,13 @@ def unlock def log_exception(error) logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts" logger.error(error) + begin + scout_response = RailzScout.submit_bug(error, nil, nil) + logger.error("* [JOB] #{name} posted case #{scout_response[:case_number]} to Fogbugz") + rescue => e + logger.error("* [JOB] #{name} FogBugz post raised an error: #{e}") + logger.error(error.backtrace) + end end # Do num jobs and return stats on success/failure. @@ -233,7 +249,7 @@ def deserialize(source) return handler if handler.respond_to?(:perform) raise DeserializationError, - 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' + 'Job failed to load: Unknown handler. Try to manually require the appropriate file.' rescue TypeError, LoadError, NameError => e raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."