diff options
Diffstat (limited to 'activejob/lib/active_job/core.rb')
-rw-r--r-- | activejob/lib/active_job/core.rb | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb new file mode 100644 index 0000000000..4ab62f89b0 --- /dev/null +++ b/activejob/lib/active_job/core.rb @@ -0,0 +1,170 @@ +# frozen_string_literal: true + +module ActiveJob + # Provides general behavior that will be included into every Active Job + # object that inherits from ActiveJob::Base. + module Core + extend ActiveSupport::Concern + + # Job arguments + attr_accessor :arguments + attr_writer :serialized_arguments + + # Timestamp when the job should be performed + attr_accessor :scheduled_at + + # Job Identifier + attr_accessor :job_id + + # Queue in which the job will reside. + attr_writer :queue_name + + # Priority that the job will have (lower is more priority). + attr_writer :priority + + # ID optionally provided by adapter + attr_accessor :provider_job_id + + # Number of times this job has been executed (which increments on every retry, like after an exception). + attr_accessor :executions + + # Hash that contains the number of times this job handled errors for each specific retry_on declaration. + # Keys are the string representation of the exceptions listed in the retry_on declaration, + # while its associated value holds the number of executions where the corresponding retry_on + # declaration handled one of its listed exceptions. + attr_accessor :exception_executions + + # I18n.locale to be used during the job. + attr_accessor :locale + + # Timezone to be used during the job. + attr_accessor :timezone + + # These methods will be included into any Active Job object, adding + # helpers for de/serialization and creation of job instances. + module ClassMethods + # Creates a new job instance from a hash created with +serialize+ + def deserialize(job_data) + job = job_data["job_class"].constantize.new + job.deserialize(job_data) + job + end + + # Creates a job preconfigured with the given options. You can call + # perform_later with the job arguments to enqueue the job with the + # preconfigured options + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # * <tt>:priority</tt> - Enqueues the job with the specified priority + # + # ==== Examples + # + # VideoJob.set(queue: :some_queue).perform_later(Video.last) + # VideoJob.set(wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last) + def set(options = {}) + ConfiguredJob.new(self, options) + end + end + + # Creates a new job instance. Takes the arguments that will be + # passed to the perform method. + def initialize(*arguments) + @arguments = arguments + @job_id = SecureRandom.uuid + @queue_name = self.class.queue_name + @priority = self.class.priority + @executions = 0 + @exception_executions = Hash.new(0) + end + + # Returns a hash with the job data that can safely be passed to the + # queuing adapter. + def serialize + { + "job_class" => self.class.name, + "job_id" => job_id, + "provider_job_id" => provider_job_id, + "queue_name" => queue_name, + "priority" => priority, + "arguments" => serialize_arguments_if_needed(arguments), + "executions" => executions, + "exception_executions" => exception_executions, + "locale" => I18n.locale.to_s, + "timezone" => Time.zone.try(:name) + } + end + + # Attaches the stored job data to the current instance. Receives a hash + # returned from +serialize+ + # + # ==== Examples + # + # class DeliverWebhookJob < ActiveJob::Base + # attr_writer :attempt_number + # + # def attempt_number + # @attempt_number ||= 0 + # end + # + # def serialize + # super.merge('attempt_number' => attempt_number + 1) + # end + # + # def deserialize(job_data) + # super + # self.attempt_number = job_data['attempt_number'] + # end + # + # rescue_from(Timeout::Error) do |exception| + # raise exception if attempt_number > 5 + # retry_job(wait: 10) + # end + # end + def deserialize(job_data) + self.job_id = job_data["job_id"] + self.provider_job_id = job_data["provider_job_id"] + self.queue_name = job_data["queue_name"] + self.priority = job_data["priority"] + self.serialized_arguments = job_data["arguments"] + self.executions = job_data["executions"] + self.exception_executions = job_data["exception_executions"] + self.locale = job_data["locale"] || I18n.locale.to_s + self.timezone = job_data["timezone"] || Time.zone.try(:name) + end + + private + def serialize_arguments_if_needed(arguments) + if arguments_serialized? + @serialized_arguments + else + serialize_arguments(arguments) + end + end + + def deserialize_arguments_if_needed + if arguments_serialized? + @arguments = deserialize_arguments(@serialized_arguments) + @serialized_arguments = nil + end + end + + def serialize_arguments(arguments) + Arguments.serialize(arguments) + end + + def deserialize_arguments(serialized_args) + Arguments.deserialize(serialized_args) + end + + def arguments_serialized? + defined?(@serialized_arguments) && @serialized_arguments + end + end +end |