module ActiveJob # Provides general behavior that will be included into every Active Job # object that inherits from ActiveJob::Base. module Core extend ActiveSupport::Concern included do # Job arguments attr_accessor :arguments attr_writer :serialized_arguments # Timestamp when the job should be performed attr_accessor :scheduled_at # Job Identifier attr_accessor :job_id # Queue in which the job will reside. attr_writer :queue_name # ID optionally provided by adapter attr_accessor :provider_job_id # I18n.locale to be used during the job. attr_accessor :locale end # These methods will be included into any Active Job object, adding # helpers for de/serialization and creation of job instances. module ClassMethods # Creates a new job instance from a hash created with +serialize+ def deserialize(job_data) job = job_data['job_class'].constantize.new job.deserialize(job_data) job end # Creates a job preconfigured with the given options. You can call # perform_later with the job arguments to enqueue the job with the # preconfigured options # # ==== Options # * :wait - Enqueues the job with the specified delay # * :wait_until - Enqueues the job at the time specified # * :queue - Enqueues the job on the specified queue # # ==== Examples # # VideoJob.set(queue: :some_queue).perform_later(Video.last) # VideoJob.set(wait: 5.minutes).perform_later(Video.last) # VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last) # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last) # VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last) def set(options={}) ConfiguredJob.new(self, options) end end # Creates a new job instance. Takes the arguments that will be # passed to the perform method. def initialize(*arguments) @arguments = arguments @job_id = SecureRandom.uuid @queue_name = self.class.queue_name end # Returns a hash with the job data that can safely be passed to the # queueing adapter. def serialize { 'job_class' => self.class.name, 'job_id' => job_id, 'queue_name' => queue_name, 'arguments' => serialize_arguments(arguments), 'locale' => I18n.locale } end # Attaches the stored job data to the current instance. Receives a hash # returned from +serialize+ # # ==== Examples # # class DeliverWebhookJob < ActiveJob::Base # def serialize # super.merge('attempt_number' => (@attempt_number || 0) + 1) # end # # def deserialize(job_data) # super # @attempt_number = job_data['attempt_number'] # end # # rescue_from(TimeoutError) do |exception| # raise exception if @attempt_number > 5 # retry_job(wait: 10) # end # end def deserialize(job_data) self.job_id = job_data['job_id'] self.queue_name = job_data['queue_name'] self.serialized_arguments = job_data['arguments'] self.locale = job_data['locale'] || I18n.locale end private def deserialize_arguments_if_needed if defined?(@serialized_arguments) && @serialized_arguments.present? @arguments = deserialize_arguments(@serialized_arguments) @serialized_arguments = nil end end def serialize_arguments(serialized_args) Arguments.serialize(serialized_args) end def deserialize_arguments(serialized_args) Arguments.deserialize(serialized_args) end end end