aboutsummaryrefslogtreecommitdiffstats
path: root/activejob
diff options
context:
space:
mode:
Diffstat (limited to 'activejob')
-rw-r--r--activejob/lib/active_job/base.rb2
-rw-r--r--activejob/lib/active_job/core.rb6
-rw-r--r--activejob/lib/active_job/enqueuing.rb27
-rw-r--r--activejob/lib/active_job/exceptions.rb117
-rw-r--r--activejob/lib/active_job/execution.rb1
-rw-r--r--activejob/test/cases/exceptions_test.rb90
-rw-r--r--activejob/test/jobs/retry_job.rb27
7 files changed, 244 insertions, 26 deletions
diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb
index ff5c69ddc6..94a1faba3f 100644
--- a/activejob/lib/active_job/base.rb
+++ b/activejob/lib/active_job/base.rb
@@ -5,6 +5,7 @@ require 'active_job/queue_priority'
require 'active_job/enqueuing'
require 'active_job/execution'
require 'active_job/callbacks'
+require 'active_job/exceptions'
require 'active_job/logging'
require 'active_job/translation'
@@ -62,6 +63,7 @@ module ActiveJob #:nodoc:
include Enqueuing
include Execution
include Callbacks
+ include Exceptions
include Logging
include Translation
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb
index f0606b3834..1b5ddff17b 100644
--- a/activejob/lib/active_job/core.rb
+++ b/activejob/lib/active_job/core.rb
@@ -24,6 +24,9 @@ module ActiveJob
# ID optionally provided by adapter
attr_accessor :provider_job_id
+ # Number of times this job has been executed (which increments on every retry, like after an exception).
+ attr_accessor :executions
+
# I18n.locale to be used during the job.
attr_accessor :locale
end
@@ -68,6 +71,7 @@ module ActiveJob
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
@priority = self.class.priority
+ @executions = 0
end
# Returns a hash with the job data that can safely be passed to the
@@ -79,6 +83,7 @@ module ActiveJob
'queue_name' => queue_name,
'priority' => priority,
'arguments' => serialize_arguments(arguments),
+ 'executions' => executions,
'locale' => I18n.locale.to_s
}
end
@@ -109,6 +114,7 @@ module ActiveJob
self.queue_name = job_data['queue_name']
self.priority = job_data['priority']
self.serialized_arguments = job_data['arguments']
+ self.executions = job_data['executions']
self.locale = job_data['locale'] || I18n.locale.to_s
end
diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb
index 9dc3c0fa57..7bc0fe6c3a 100644
--- a/activejob/lib/active_job/enqueuing.rb
+++ b/activejob/lib/active_job/enqueuing.rb
@@ -1,7 +1,7 @@
require 'active_job/arguments'
module ActiveJob
- # Provides behavior for enqueuing and retrying jobs.
+ # Provides behavior for enqueuing jobs.
module Enqueuing
extend ActiveSupport::Concern
@@ -24,31 +24,6 @@ module ActiveJob
end
end
- # Reschedules the job to be re-executed. This is useful in combination
- # with the +rescue_from+ option. When you rescue an exception from your job
- # you can ask Active Job to retry performing your job.
- #
- # ==== Options
- # * <tt>:wait</tt> - Enqueues the job with the specified delay
- # * <tt>:wait_until</tt> - Enqueues the job at the time specified
- # * <tt>:queue</tt> - Enqueues the job on the specified queue
- # * <tt>:priority</tt> - Enqueues the job with the specified priority
- #
- # ==== Examples
- #
- # class SiteScraperJob < ActiveJob::Base
- # rescue_from(ErrorLoadingSite) do
- # retry_job queue: :low_priority
- # end
- #
- # def perform(*args)
- # # raise ErrorLoadingSite if cannot scrape
- # end
- # end
- def retry_job(options={})
- enqueue options
- end
-
# Enqueues the job to be performed by the queue adapter.
#
# ==== Options
diff --git a/activejob/lib/active_job/exceptions.rb b/activejob/lib/active_job/exceptions.rb
new file mode 100644
index 0000000000..5488a86756
--- /dev/null
+++ b/activejob/lib/active_job/exceptions.rb
@@ -0,0 +1,117 @@
+require 'active_support/core_ext/numeric/time'
+
+module ActiveJob
+ # Provides behavior for retrying and discarding jobs on exceptions.
+ module Exceptions
+ extend ActiveSupport::Concern
+
+ module ClassMethods
+ # Catch the exception and reschedule job for re-execution after so many seconds, for a specific number of attempts.
+ # If the exception keeps getting raised beyond the specified number of attempts, the exception is allowed to
+ # bubble up to the underlying queuing system, which may have its own retry mechanism or place it in a
+ # holding queue for inspection.
+ #
+ # You can also pass a block that'll be invoked if the retry attempts fail for custom logic rather than letting
+ # the exception bubble up.
+ #
+ # ==== Options
+ # * <tt>:wait</tt> - Re-enqueues the job with a delay specified either in seconds (default: 3 seconds),
+ # as a computing proc that the number of executions so far as an argument, or as a symbol reference of
+ # <tt>:exponentially_longer<>, which applies the wait algorithm of <tt>(executions ** 4) + 2</tt>
+ # (first wait 3s, then 18s, then 83s, etc)
+ # * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts)
+ # * <tt>:queue</tt> - Re-enqueues the job on a different queue
+ # * <tt>:priority</tt> - Re-enqueues the job with a different priority
+ #
+ # ==== Examples
+ #
+ # class RemoteServiceJob < ActiveJob::Base
+ # retry_on CustomAppException # defaults to 3s wait, 5 attempts
+ # retry_on AnotherCustomAppException, wait: ->(executions) { executions * 2 }
+ # retry_on(YetAnotherCustomAppException) do |exception|
+ # ExceptionNotifier.caught(exception)
+ # end
+ # retry_on ActiveRecord::StatementInvalid, wait: 5.seconds, attempts: 3
+ # retry_on Net::OpenTimeout, wait: :exponentially_longer, attempts: 10
+ #
+ # def perform(*args)
+ # # Might raise CustomAppException, AnotherCustomAppException, or YetAnotherCustomAppException for something domain specific
+ # # Might raise ActiveRecord::StatementInvalid when a local db deadlock is detected
+ # # Might raise Net::OpenTimeout when the remote service is down
+ # end
+ # end
+ def retry_on(exception, wait: 3.seconds, attempts: 5, queue: nil, priority: nil)
+ rescue_from exception do |error|
+ if executions < attempts
+ logger.error "Retrying #{self.class} in #{wait} seconds, due to a #{exception}. The original exception was #{error.cause.inspect}."
+ retry_job wait: determine_delay(wait), queue: queue, priority: priority
+ else
+ if block_given?
+ yield exception
+ else
+ logger.error "Stopped retrying #{self.class} due to a #{exception}, which reoccurred on #{executions} attempts. The original exception was #{error.cause.inspect}."
+ raise error
+ end
+ end
+ end
+ end
+
+ # Discard the job with no attempts to retry, if the exception is raised. This is useful when the subject of the job,
+ # like an Active Record, is no longer available, and the job is thus no longer relevant.
+ #
+ # ==== Example
+ #
+ # class SearchIndexingJob < ActiveJob::Base
+ # discard_on ActiveJob::DeserializationError
+ #
+ # def perform(record)
+ # # Will raise ActiveJob::DeserializationError if the record can't be deserialized
+ # end
+ # end
+ def discard_on(exception)
+ rescue_from exception do |error|
+ logger.error "Discarded #{self.class} due to a #{exception}. The original exception was #{error.cause.inspect}."
+ end
+ end
+ end
+
+ # Reschedules the job to be re-executed. This is useful in combination
+ # with the +rescue_from+ option. When you rescue an exception from your job
+ # you can ask Active Job to retry performing your job.
+ #
+ # ==== Options
+ # * <tt>:wait</tt> - Enqueues the job with the specified delay in seconds
+ # * <tt>:wait_until</tt> - Enqueues the job at the time specified
+ # * <tt>:queue</tt> - Enqueues the job on the specified queue
+ # * <tt>:priority</tt> - Enqueues the job with the specified priority
+ #
+ # ==== Examples
+ #
+ # class SiteScraperJob < ActiveJob::Base
+ # rescue_from(ErrorLoadingSite) do
+ # retry_job queue: :low_priority
+ # end
+ #
+ # def perform(*args)
+ # # raise ErrorLoadingSite if cannot scrape
+ # end
+ # end
+ def retry_job(options = {})
+ enqueue options
+ end
+
+ private
+ def determine_delay(seconds_or_algorithm)
+ case seconds_or_algorithm
+ when :exponentially_longer
+ (executions ** 4) + 2
+ when Integer
+ seconds = seconds_or_algorithm
+ seconds
+ when Proc
+ algorithm = seconds_or_algorithm
+ algorithm.call(executions)
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb
index 4e4acfc2c2..0c047cd4e1 100644
--- a/activejob/lib/active_job/execution.rb
+++ b/activejob/lib/active_job/execution.rb
@@ -31,6 +31,7 @@ module ActiveJob
def perform_now
deserialize_arguments_if_needed
run_callbacks :perform do
+ self.executions = executions + 1
perform(*arguments)
end
rescue => exception
diff --git a/activejob/test/cases/exceptions_test.rb b/activejob/test/cases/exceptions_test.rb
new file mode 100644
index 0000000000..9ad73ff93c
--- /dev/null
+++ b/activejob/test/cases/exceptions_test.rb
@@ -0,0 +1,90 @@
+require 'helper'
+require 'jobs/retry_job'
+
+class ExceptionsTest < ActiveSupport::TestCase
+ setup do
+ JobBuffer.clear
+ end
+
+ test "successfully retry job throwing exception against defaults" do
+ RetryJob.perform_later 'DefaultsError', 5
+
+ assert_equal [
+ "Raised DefaultsError for the 1st time",
+ "Raised DefaultsError for the 2nd time",
+ "Raised DefaultsError for the 3rd time",
+ "Raised DefaultsError for the 4th time",
+ "Successfully completed job" ], JobBuffer.values
+ end
+
+ test "successfully retry job throwing exception against higher limit" do
+ RetryJob.perform_later 'ShortWaitTenAttemptsError', 9
+ assert_equal 9, JobBuffer.values.count
+ end
+
+ test "failed retry job when exception kept occurring against defaults" do
+ begin
+ RetryJob.perform_later 'DefaultsError', 6
+ assert_equal "Raised DefaultsError for the 5th time", JobBuffer.last_value
+ rescue DefaultsError
+ pass
+ end
+ end
+
+ test "failed retry job when exception kept occurring against higher limit" do
+ begin
+ RetryJob.perform_later 'ShortWaitTenAttemptsError', 11
+ assert_equal "Raised ShortWaitTenAttemptsError for the 10th time", JobBuffer.last_value
+ rescue ShortWaitTenAttemptsError
+ pass
+ end
+ end
+
+ test "discard job" do
+ RetryJob.perform_later 'DiscardableError', 2
+ assert_equal "Raised DiscardableError for the 1st time", JobBuffer.last_value
+ end
+
+ test "custom handling of job that exceeds retry attempts" do
+ RetryJob.perform_later 'CustomCatchError', 6
+ assert_equal "Dealt with a job that failed to retry in a custom way", JobBuffer.last_value
+ end
+end
+
+class ExponentiallyBackoffExceptionsTest < ActiveJob::TestCase
+ setup do
+ JobBuffer.clear
+ end
+
+ test "exponentially retrying job" do
+ travel_to Time.now
+
+ perform_enqueued_jobs do
+ assert_performed_with at: (Time.now + 3.seconds).to_i do
+ assert_performed_with at: (Time.now + 18.seconds).to_i do
+ assert_performed_with at: (Time.now + 83.seconds).to_i do
+ assert_performed_with at: (Time.now + 258.seconds).to_i do
+ RetryJob.perform_later 'ExponentialWaitTenAttemptsError', 5
+ end
+ end
+ end
+ end
+ end
+ end
+
+ test "custom wait retrying job" do
+ travel_to Time.now
+
+ perform_enqueued_jobs do
+ assert_performed_with at: (Time.now + 2.seconds).to_i do
+ assert_performed_with at: (Time.now + 4.seconds).to_i do
+ assert_performed_with at: (Time.now + 6.seconds).to_i do
+ assert_performed_with at: (Time.now + 8.seconds).to_i do
+ RetryJob.perform_later 'CustomWaitTenAttemptsError', 5
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/activejob/test/jobs/retry_job.rb b/activejob/test/jobs/retry_job.rb
new file mode 100644
index 0000000000..321294f892
--- /dev/null
+++ b/activejob/test/jobs/retry_job.rb
@@ -0,0 +1,27 @@
+require_relative '../support/job_buffer'
+require 'active_support/core_ext/integer/inflections'
+
+class DefaultsError < StandardError; end
+class ShortWaitTenAttemptsError < StandardError; end
+class ExponentialWaitTenAttemptsError < StandardError; end
+class CustomWaitTenAttemptsError < StandardError; end
+class CustomCatchError < StandardError; end
+class DiscardableError < StandardError; end
+
+class RetryJob < ActiveJob::Base
+ retry_on DefaultsError
+ retry_on ShortWaitTenAttemptsError, wait: 1.second, attempts: 10
+ retry_on ExponentialWaitTenAttemptsError, wait: :exponentially_longer, attempts: 10
+ retry_on CustomWaitTenAttemptsError, wait: ->(executions) { executions * 2 }, attempts: 10
+ retry_on(CustomCatchError) { |exception| JobBuffer.add("Dealt with a job that failed to retry in a custom way") }
+ discard_on DiscardableError
+
+ def perform(raising, attempts)
+ if executions < attempts
+ JobBuffer.add("Raised #{raising} for the #{executions.ordinalize} time")
+ raise raising.constantize
+ else
+ JobBuffer.add("Successfully completed job")
+ end
+ end
+end