diff options
Diffstat (limited to 'activejob/lib/active_job/core.rb')
-rw-r--r-- | activejob/lib/active_job/core.rb | 121 |
1 files changed, 83 insertions, 38 deletions
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb index f7f882c998..283125698d 100644 --- a/activejob/lib/active_job/core.rb +++ b/activejob/lib/active_job/core.rb @@ -1,39 +1,54 @@ +# frozen_string_literal: true + module ActiveJob # Provides general behavior that will be included into every Active Job # object that inherits from ActiveJob::Base. module Core extend ActiveSupport::Concern - included do - # Job arguments - attr_accessor :arguments - attr_writer :serialized_arguments + # Job arguments + attr_accessor :arguments + attr_writer :serialized_arguments - # Timestamp when the job should be performed - attr_accessor :scheduled_at + # Timestamp when the job should be performed + attr_accessor :scheduled_at - # Job Identifier - attr_accessor :job_id + # Job Identifier + attr_accessor :job_id - # Queue in which the job will reside. - attr_writer :queue_name + # Queue in which the job will reside. + attr_writer :queue_name - # Priority that the job will have (lower is more priority). - attr_writer :priority + # Priority that the job will have (lower is more priority). + attr_writer :priority - # ID optionally provided by adapter - attr_accessor :provider_job_id + # ID optionally provided by adapter + attr_accessor :provider_job_id - # I18n.locale to be used during the job. - attr_accessor :locale - end + # Number of times this job has been executed (which increments on every retry, like after an exception). + attr_accessor :executions + + # Hash that contains the number of times this job handled errors for each specific retry_on declaration. + # Keys are the string representation of the exceptions listed in the retry_on declaration, + # while its associated value holds the number of executions where the corresponding retry_on + # declaration handled one of its listed exceptions. + attr_accessor :exception_executions + + # I18n.locale to be used during the job. + attr_accessor :locale + + # Timezone to be used during the job. + attr_accessor :timezone + + # Track when a job was enqueued + attr_accessor :enqueued_at # These methods will be included into any Active Job object, adding # helpers for de/serialization and creation of job instances. module ClassMethods # Creates a new job instance from a hash created with +serialize+ def deserialize(job_data) - job = job_data['job_class'].constantize.new + job = job_data["job_class"].constantize.new job.deserialize(job_data) job end @@ -56,7 +71,7 @@ module ActiveJob # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last) # VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last) # VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last) - def set(options={}) + def set(options = {}) ConfiguredJob.new(self, options) end end @@ -68,18 +83,25 @@ module ActiveJob @job_id = SecureRandom.uuid @queue_name = self.class.queue_name @priority = self.class.priority + @executions = 0 + @exception_executions = {} end # Returns a hash with the job data that can safely be passed to the - # queueing adapter. + # queuing adapter. def serialize { - 'job_class' => self.class.name, - 'job_id' => job_id, - 'queue_name' => queue_name, - 'priority' => priority, - 'arguments' => serialize_arguments(arguments), - 'locale' => I18n.locale.to_s + "job_class" => self.class.name, + "job_id" => job_id, + "provider_job_id" => provider_job_id, + "queue_name" => queue_name, + "priority" => priority, + "arguments" => serialize_arguments_if_needed(arguments), + "executions" => executions, + "exception_executions" => exception_executions, + "locale" => I18n.locale.to_s, + "timezone" => Time.zone.try(:name), + "enqueued_at" => Time.now.utc.iso8601 } end @@ -89,42 +111,65 @@ module ActiveJob # ==== Examples # # class DeliverWebhookJob < ActiveJob::Base + # attr_writer :attempt_number + # + # def attempt_number + # @attempt_number ||= 0 + # end + # # def serialize - # super.merge('attempt_number' => (@attempt_number || 0) + 1) + # super.merge('attempt_number' => attempt_number + 1) # end # # def deserialize(job_data) # super - # @attempt_number = job_data['attempt_number'] + # self.attempt_number = job_data['attempt_number'] # end # - # rescue_from(TimeoutError) do |exception| - # raise exception if @attempt_number > 5 + # rescue_from(Timeout::Error) do |exception| + # raise exception if attempt_number > 5 # retry_job(wait: 10) # end # end def deserialize(job_data) - self.job_id = job_data['job_id'] - self.queue_name = job_data['queue_name'] - self.priority = job_data['priority'] - self.serialized_arguments = job_data['arguments'] - self.locale = job_data['locale'] || I18n.locale.to_s + self.job_id = job_data["job_id"] + self.provider_job_id = job_data["provider_job_id"] + self.queue_name = job_data["queue_name"] + self.priority = job_data["priority"] + self.serialized_arguments = job_data["arguments"] + self.executions = job_data["executions"] + self.exception_executions = job_data["exception_executions"] + self.locale = job_data["locale"] || I18n.locale.to_s + self.timezone = job_data["timezone"] || Time.zone.try(:name) + self.enqueued_at = job_data["enqueued_at"] end private + def serialize_arguments_if_needed(arguments) + if arguments_serialized? + @serialized_arguments + else + serialize_arguments(arguments) + end + end + def deserialize_arguments_if_needed - if defined?(@serialized_arguments) && @serialized_arguments.present? + if arguments_serialized? @arguments = deserialize_arguments(@serialized_arguments) @serialized_arguments = nil end end - def serialize_arguments(serialized_args) - Arguments.serialize(serialized_args) + def serialize_arguments(arguments) + Arguments.serialize(arguments) end def deserialize_arguments(serialized_args) Arguments.deserialize(serialized_args) end + + def arguments_serialized? + defined?(@serialized_arguments) && @serialized_arguments + end end end |