diff options
Diffstat (limited to 'activejob/lib/active_job')
26 files changed, 633 insertions, 174 deletions
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb index 369e716912..175a2f0956 100644 --- a/activejob/lib/active_job/arguments.rb +++ b/activejob/lib/active_job/arguments.rb @@ -1,14 +1,25 @@ module ActiveJob + # Raised when an exception is raised during job arguments deserialization. + # + # Wraps the original exception raised as +original_exception+. class DeserializationError < StandardError attr_reader :original_exception - def initialize(e) - super ("Error while trying to deserialize arguments: #{e.message}") + def initialize(e) #:nodoc: + super("Error while trying to deserialize arguments: #{e.message}") @original_exception = e set_backtrace e.backtrace end end + # Raised when an unsupported argument type is being set as job argument. We + # currently support NilClass, Fixnum, Float, String, TrueClass, FalseClass, + # Bignum and object that can be represented as GlobalIDs (ex: Active Record). + # Also raised if you set the key for a Hash something else than a string or + # a symbol. + class SerializationError < ArgumentError + end + module Arguments extend self TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ] @@ -19,35 +30,37 @@ module ActiveJob def deserialize(arguments) arguments.map { |argument| deserialize_argument(argument) } + rescue => e + raise DeserializationError.new(e) end private def serialize_argument(argument) case argument when GlobalID::Identification - argument.global_id.to_s + argument.to_global_id.to_s when *TYPE_WHITELIST argument when Array - serialize(argument) + argument.map { |arg| serialize_argument(arg) } when Hash Hash[ argument.map { |key, value| [ serialize_hash_key(key), serialize_argument(value) ] } ] else - raise "Unsupported argument type: #{argument.class.name}" + raise SerializationError.new("Unsupported argument type: #{argument.class.name}") end end def deserialize_argument(argument) case argument when Array - deserialize(argument) + argument.map { |arg| deserialize_argument(arg) } when Hash Hash[ argument.map { |key, value| [ key, deserialize_argument(value) ] } ].with_indifferent_access - else + when String, GlobalID GlobalID::Locator.locate(argument) || argument + else + argument end - rescue => e - raise DeserializationError.new(e) end def serialize_hash_key(key) @@ -55,7 +68,7 @@ module ActiveJob when String, Symbol key.to_s else - raise "Unsupported hash key type: #{key.class.name}" + raise SerializationError.new("Unsupported hash key type: #{key.class.name}") end end end diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb index d5ba253016..a3bec1f827 100644 --- a/activejob/lib/active_job/base.rb +++ b/activejob/lib/active_job/base.rb @@ -1,20 +1,19 @@ +require 'active_job/core' require 'active_job/queue_adapter' require 'active_job/queue_name' require 'active_job/enqueuing' require 'active_job/execution' require 'active_job/callbacks' -require 'active_job/identifier' require 'active_job/logging' module ActiveJob class Base - extend QueueAdapter - + include Core + include QueueAdapter include QueueName include Enqueuing include Execution include Callbacks - include Identifier include Logging ActiveSupport.run_load_hooks(:active_job, self) diff --git a/activejob/lib/active_job/callbacks.rb b/activejob/lib/active_job/callbacks.rb index 8901fa77f2..cafa3438c0 100644 --- a/activejob/lib/active_job/callbacks.rb +++ b/activejob/lib/active_job/callbacks.rb @@ -3,8 +3,8 @@ require 'active_support/callbacks' module ActiveJob # = Active Job Callbacks # - # Active Job provides hooks during the lifecycle of a job. Callbacks allow you to trigger - # logic during the lifecycle of a job. Available callbacks: + # Active Job provides hooks during the lifecycle of a job. Callbacks allow you + # to trigger logic during the lifecycle of a job. Available callbacks are: # # * <tt>before_enqueue</tt> # * <tt>around_enqueue</tt> diff --git a/activejob/lib/active_job/configured_job.rb b/activejob/lib/active_job/configured_job.rb new file mode 100644 index 0000000000..979280b910 --- /dev/null +++ b/activejob/lib/active_job/configured_job.rb @@ -0,0 +1,16 @@ +module ActiveJob + class ConfiguredJob #:nodoc: + def initialize(job_class, options={}) + @options = options + @job_class = job_class + end + + def perform_now(*args) + @job_class.new(*args).perform_now + end + + def perform_later(*args) + @job_class.new(*args).enqueue @options + end + end +end diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb new file mode 100644 index 0000000000..b6dd03a0bc --- /dev/null +++ b/activejob/lib/active_job/core.rb @@ -0,0 +1,89 @@ +module ActiveJob + module Core + extend ActiveSupport::Concern + + included do + # Job arguments + attr_accessor :arguments + attr_writer :serialized_arguments + + # Timestamp when the job should be performed + attr_accessor :scheduled_at + + # Job Identifier + attr_accessor :job_id + + # Queue on which the job should be run on. + attr_writer :queue_name + end + + module ClassMethods + # Creates a new job instance from a hash created with +serialize+ + def deserialize(job_data) + job = job_data['job_class'].constantize.new + job.job_id = job_data['job_id'] + job.queue_name = job_data['queue_name'] + job.serialized_arguments = job_data['arguments'] + job + end + + # Creates a job preconfigured with the given options. You can call + # perform_later with the job arguments to enqueue the job with the + # preconfigured options + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # VideoJob.set(queue: :some_queue).perform_later(Video.last) + # VideoJob.set(wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(wait_until: Time.tomorroe).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait_until: Time.tomorroe).perform_later(Video.last) + def set(options={}) + ConfiguredJob.new(self, options) + end + end + + # Creates a new job instance. Takes as arguments the arguments that + # will be passed to the perform method. + def initialize(*arguments) + @arguments = arguments + @job_id = SecureRandom.uuid + @queue_name = self.class.queue_name + end + + # Returns a hash with the job data that can safely be passed to the + # queueing adapter. + def serialize + { + 'job_class' => self.class.name, + 'job_id' => job_id, + 'queue_name' => queue_name, + 'arguments' => serialize_arguments(arguments) + } + end + + private + def deserialize_arguments_if_needed + if defined?(@serialized_arguments) && @serialized_arguments.present? + @arguments = deserialize_arguments(@serialized_arguments) + @serialized_arguments = nil + end + end + + def serialize_arguments(serialized_args) + Arguments.serialize(serialized_args) + end + + def deserialize_arguments(serialized_args) + Arguments.deserialize(serialized_args) + end + end +end + + + diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 3d00d51867..e8bc44cbc4 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -12,60 +12,64 @@ module ActiveJob # # Returns an instance of the job class queued with args available in # Job#arguments. - def enqueue(*args) - new(args).tap do |job| - job.run_callbacks :enqueue do - queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args) - end - end - end - - # Enqueue a job to be performed at +interval+ from now. - # - # enqueue_in(1.week, "mike") - # - # Returns an instance of the job class queued with args available in - # Job#arguments and the timestamp in Job#enqueue_at. - def enqueue_in(interval, *args) - enqueue_at interval.seconds.from_now, *args + def perform_later(*args) + job_or_instantiate(*args).enqueue end - # Enqueue a job to be performed at an explicit point in time. - # - # enqueue_at(Date.tomorrow.midnight, "mike") - # - # Returns an instance of the job class queued with args available in - # Job#arguments and the timestamp in Job#enqueue_at. - def enqueue_at(timestamp, *args) - new(args).tap do |job| - job.enqueued_at = timestamp - - job.run_callbacks :enqueue do - queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args) - end + protected + def job_or_instantiate(*args) + args.first.is_a?(self) ? args.first : new(*args) end - end - end - - included do - attr_accessor :arguments - attr_accessor :enqueued_at - end - - def initialize(arguments = nil) - @arguments = arguments end - def retry_now - self.class.enqueue(*arguments) + # Reschedule the job to be re-executed. This is usefull in combination + # with the +rescue_from+ option. When you rescue an exception from your job + # you can ask Active Job to retry performing your job. + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # class SiteScrapperJob < ActiveJob::Base + # rescue_from(ErrorLoadingSite) do + # retry_job queue: :low_priority + # end + # def perform(*args) + # # raise ErrorLoadingSite if cannot scrape + # end + # end + def retry_job(options={}) + enqueue options end - def retry_in(interval) - self.class.enqueue_in interval, *arguments - end - - def retry_at(timestamp) - self.class.enqueue_at timestamp, *arguments + # Equeue the job to be performed by the queue adapter. + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # my_job_instance.enqueue + # my_job_instance.enqueue wait: 5.minutes + # my_job_instance.enqueue queue: :important + # my_job_instance.enqueue wait_until: Date.tomorrow.midnight + def enqueue(options={}) + self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait] + self.scheduled_at = options[:wait_until].to_f if options[:wait_until] + self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] + run_callbacks :enqueue do + if self.scheduled_at + self.class.queue_adapter.enqueue_at self, self.scheduled_at + else + self.class.queue_adapter.enqueue self + end + end + self end end end diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb index 0e7b5bdd72..d6d67c46e3 100644 --- a/activejob/lib/active_job/execution.rb +++ b/activejob/lib/active_job/execution.rb @@ -4,15 +4,29 @@ require 'active_job/arguments' module ActiveJob module Execution extend ActiveSupport::Concern + include ActiveSupport::Rescuable - included do - include ActiveSupport::Rescuable - end + module ClassMethods + # Performs the job immediately. + # + # MyJob.perform_now("mike") + # + def perform_now(*args) + job_or_instantiate(*args).perform_now + end - def execute(job_id, *serialized_args) - self.job_id = job_id - self.arguments = deserialize_arguments(serialized_args) + def execute(job_data) #:nodoc: + job = deserialize(job_data) + job.perform_now + end + end + # Performs the job immediately. The job is not sent to the queueing adapter + # and will block the execution until it's finished. + # + # MyJob.new(*args).perform_now + def perform_now + deserialize_arguments_if_needed run_callbacks :perform do perform(*arguments) end @@ -23,11 +37,5 @@ module ActiveJob def perform(*) fail NotImplementedError end - - private - def deserialize_arguments(serialized_args) - Arguments.deserialize(serialized_args) - end - end end diff --git a/activejob/lib/active_job/gem_version.rb b/activejob/lib/active_job/gem_version.rb index 2545e09845..1f25d48326 100644 --- a/activejob/lib/active_job/gem_version.rb +++ b/activejob/lib/active_job/gem_version.rb @@ -8,7 +8,7 @@ module ActiveJob MAJOR = 4 MINOR = 2 TINY = 0 - PRE = "alpha" + PRE = "beta1" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end diff --git a/activejob/lib/active_job/identifier.rb b/activejob/lib/active_job/identifier.rb deleted file mode 100644 index c7f522087d..0000000000 --- a/activejob/lib/active_job/identifier.rb +++ /dev/null @@ -1,15 +0,0 @@ -require 'active_job/arguments' - -module ActiveJob - module Identifier - extend ActiveSupport::Concern - - included do - attr_writer :job_id - end - - def job_id - @job_id ||= SecureRandom.uuid - end - end -end diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb index ae098a80f3..962005cd15 100644 --- a/activejob/lib/active_job/logging.rb +++ b/activejob/lib/active_job/logging.rb @@ -17,7 +17,7 @@ module ActiveJob around_perform do |job, block, _| tag_logger(job.class.name, job.job_id) do - payload = {adapter: job.class.queue_adapter, job: job.class, args: job.arguments} + payload = {adapter: job.class.queue_adapter, job: job} ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup) ActiveSupport::Notifications.instrument("perform.active_job", payload) do block.call @@ -26,12 +26,12 @@ module ActiveJob end before_enqueue do |job| - if job.enqueued_at + if job.scheduled_at ActiveSupport::Notifications.instrument "enqueue_at.active_job", - adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at + adapter: job.class.queue_adapter, job: job else ActiveSupport::Notifications.instrument "enqueue.active_job", - adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments + adapter: job.class.queue_adapter, job: job end end end @@ -52,19 +52,31 @@ module ActiveJob class LogSubscriber < ActiveSupport::LogSubscriber def enqueue(event) - info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event) } + info do + job = event.payload[:job] + "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job) + end end def enqueue_at(event) - info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) } + info do + job = event.payload[:job] + "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job) + end end def perform_start(event) - info { "Performing #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) } + info do + job = event.payload[:job] + "Performing #{job.class.name} from #{queue_name(event)}" + args_info(job) + end end def perform(event) - info { "Performed #{event.payload[:job].name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" } + info do + job = event.payload[:job] + "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" + end end private @@ -72,12 +84,12 @@ module ActiveJob event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})" end - def args_info(event) - event.payload[:args].any? ? " with arguments: #{event.payload[:args].map(&:inspect).join(", ")}" : "" + def args_info(job) + job.arguments.any? ? " with arguments: #{job.arguments.map(&:inspect).join(", ")}" : "" end - def enqueued_at(event) - Time.at(event.payload[:timestamp]).utc + def scheduled_at(event) + Time.at(event.payload[:job].scheduled_at).utc end def logger diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb index 8f2f8b86ea..fb54aec75e 100644 --- a/activejob/lib/active_job/queue_adapter.rb +++ b/activejob/lib/active_job/queue_adapter.rb @@ -3,22 +3,27 @@ require 'active_support/core_ext/string/inflections' module ActiveJob module QueueAdapter - mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter } + extend ActiveSupport::Concern - def queue_adapter=(name_or_adapter) - @@queue_adapter = \ - case name_or_adapter - when Symbol, String - load_adapter(name_or_adapter) - when Class - name_or_adapter - end - end + module ClassMethods + mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter } - private - def load_adapter(name) - require "active_job/queue_adapters/#{name}_adapter" - "ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize + def queue_adapter=(name_or_adapter) + @@queue_adapter = \ + case name_or_adapter + when :test + ActiveJob::QueueAdapters::TestAdapter.new + when Symbol, String + load_adapter(name_or_adapter) + when Class + name_or_adapter + end end + + private + def load_adapter(name) + "ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize + end + end end end
\ No newline at end of file diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb new file mode 100644 index 0000000000..345b01ef00 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters.rb @@ -0,0 +1,17 @@ +module ActiveJob + module QueueAdapters + extend ActiveSupport::Autoload + + autoload :InlineAdapter + autoload :BackburnerAdapter + autoload :DelayedJobAdapter + autoload :QuAdapter + autoload :QueAdapter + autoload :QueueClassicAdapter + autoload :ResqueAdapter + autoload :SidekiqAdapter + autoload :SneakersAdapter + autoload :SuckerPunchAdapter + autoload :TestAdapter + end +end diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb index 6fe2d4eb53..e1b00f1de7 100644 --- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb @@ -4,19 +4,20 @@ module ActiveJob module QueueAdapters class BackburnerAdapter class << self - def enqueue(job, *args) - Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name + def enqueue(job) + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name end - def enqueue_at(job, timestamp, *args) - raise NotImplementedError + def enqueue_at(job, timestamp) + delay = timestamp - Time.current.to_f + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay end end class JobWrapper class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb index a00569833a..658799edfc 100644 --- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class DelayedJobAdapter class << self - def enqueue(job, *args) - JobWrapper.new.delay(queue: job.queue_name).perform(job, *args) + def enqueue(job) + JobWrapper.new.delay(queue: job.queue_name).perform(job.serialize) end - def enqueue_at(job, timestamp, *args) - JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job, *args) + def enqueue_at(job, timestamp) + JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job.serialize) end end class JobWrapper - def perform(job, *args) - job.new.execute(*args) + def perform(job_data) + Base.execute(job_data) end end end diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb index 5805340fb0..fdefa38d5e 100644 --- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb @@ -2,8 +2,8 @@ module ActiveJob module QueueAdapters class InlineAdapter class << self - def enqueue(job, *args) - job.new.execute(*args) + def enqueue(job) + Base.execute(job.serialize) end def enqueue_at(*) diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb index 5cb741c094..f681fd7e8a 100644 --- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb @@ -5,7 +5,7 @@ module ActiveJob class QuAdapter class << self def enqueue(job, *args) - Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload| + Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload| payload.instance_variable_set(:@queue, job.queue_name) end.push end @@ -16,13 +16,12 @@ module ActiveJob end class JobWrapper < Qu::Job - def initialize(job_name, *args) - @job = job_name.constantize - @args = args + def initialize(job_data) + @job_data = job_data end def perform - @job.new.execute(*@args) + Base.execute @job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb index 0b87deb4e0..51891ab07b 100644 --- a/activejob/lib/active_job/queue_adapters/que_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class QueAdapter class << self - def enqueue(job, *args) - JobWrapper.enqueue job.name, *args, queue: job.queue_name + def enqueue(job) + JobWrapper.enqueue job.serialize, queue: job.queue_name end - def enqueue_at(job, timestamp, *args) - raise NotImplementedError + def enqueue_at(job, timestamp) + JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp) end end class JobWrapper < Que::Job - def run(job_name, *args) - job_name.constantize.new.execute(*args) + def run(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb index d74f8cf90e..ddcc868317 100644 --- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -4,19 +4,34 @@ module ActiveJob module QueueAdapters class QueueClassicAdapter class << self - def enqueue(job, *args) - QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args) + def enqueue(job) + build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) end - def enqueue_at(job, timestamp, *args) - raise NotImplementedError + def enqueue_at(job, timestamp) + queue = build_queue(job.queue_name) + unless queue.respond_to?(:enqueue_at) + raise NotImplementedError, 'To be able to schedule jobs with Queue Classic ' \ + 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' + 'You can implement this yourself or you can use the queue_classic-later gem.' + end + queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) + end + + # Builds a <tt>QC::Queue</tt> object to schedule jobs on. + # + # If you have a custom <tt>QC::Queue</tt> subclass you'll need to suclass + # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the + # <tt>build_queue</tt> method. + def build_queue(queue_name) + QC::Queue.new(queue_name) end end class JobWrapper class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb index da8212fc9b..affa3bdfbc 100644 --- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb @@ -16,23 +16,23 @@ module ActiveJob module QueueAdapters class ResqueAdapter class << self - def enqueue(job, *args) - Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args + def enqueue(job) + Resque.enqueue_to job.queue_name, JobWrapper, job.serialize end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) unless Resque.respond_to?(:enqueue_at_with_queue) raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \ "resque-scheduler gem. Please add it to your Gemfile and run bundle install" end - Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args + Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize end end class JobWrapper class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb index 3e20bec44c..79926a1e24 100644 --- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -4,20 +4,20 @@ module ActiveJob module QueueAdapters class SidekiqAdapter class << self - def enqueue(job, *args) + def enqueue(job) #Sidekiq::Client does not support symbols as keys Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, - 'args' => [ job, *args ], + 'args' => [ job.serialize ], 'retry' => true end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, - 'args' => [ job, *args ], + 'args' => [ job.serialize ], 'retry' => true, 'at' => timestamp end @@ -26,8 +26,8 @@ module ActiveJob class JobWrapper include Sidekiq::Worker - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb index 48b3df6a46..1ab0a87485 100644 --- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -7,25 +7,25 @@ module ActiveJob @monitor = Monitor.new class << self - def enqueue(job, *args) + def enqueue(job) @monitor.synchronize do JobWrapper.from_queue job.queue_name - JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ]) + JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize) end end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) raise NotImplementedError end end class JobWrapper include Sneakers::Worker - from_queue 'active_jobs_default' + from_queue 'default' def work(msg) - job_name, *args = ActiveSupport::JSON.decode(msg) - job_name.constantize.new.execute(*args) + job_data = ActiveSupport::JSON.decode(msg) + Base.execute job_data ack! end end diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb index 16f05744f3..b19a38093f 100644 --- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -4,11 +4,11 @@ module ActiveJob module QueueAdapters class SuckerPunchAdapter class << self - def enqueue(job, *args) - JobWrapper.new.async.perform job, *args + def enqueue(job) + JobWrapper.new.async.perform job.serialize end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) raise NotImplementedError end end @@ -16,8 +16,8 @@ module ActiveJob class JobWrapper include SuckerPunch::Job - def perform(job, *args) - job.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb new file mode 100644 index 0000000000..b9997efddf --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb @@ -0,0 +1,76 @@ +module ActiveJob + module QueueAdapters + class TestAdapter + attr_accessor(:perform_enqueued_jobs) { false } + attr_accessor(:perform_enqueued_at_jobs) { false } + delegate :name, to: :class + + # Provides a store of all the enqueued jobs with the TestAdapter so you can check them. + def enqueued_jobs + @enqueued_jobs ||= [] + end + + # Allows you to overwrite the default enqueued jobs store from an array to some + # other object. If you just want to clear the store, + # call ActiveJob::QueueAdapters::TestAdapter.enqueued_jobs.clear. + # + # If you place another object here, please make sure it responds to: + # + # * << (message) + # * clear + # * length + # * size + # * and other common Array methods + def enqueued_jobs=(val) + @enqueued_jobs = val + end + + # Provides a store of all the performed jobs with the TestAdapter so you can check them. + def performed_jobs + @performed_jobs ||= [] + end + + # Allows you to overwrite the default performed jobs store from an array to some + # other object. If you just want to clear the store, + # call ActiveJob::QueueAdapters::TestAdapter.performed_jobs.clear. + # + # If you place another object here, please make sure it responds to: + # + # * << (message) + # * clear + # * length + # * size + # * and other common Array methods + def performed_jobs=(val) + @performed_jobs = val + end + + def enqueue(job) + if perform_enqueued_jobs? + performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name} + job.perform_now + else + enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name} + end + end + + def enqueue_at(job, timestamp) + if perform_enqueued_at_jobs? + performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp} + job.perform_now + else + enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp} + end + end + + private + def perform_enqueued_jobs? + perform_enqueued_jobs + end + + def perform_enqueued_at_jobs? + perform_enqueued_at_jobs + end + end + end +end diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb index 9698835b6e..45acb71605 100644 --- a/activejob/lib/active_job/queue_name.rb +++ b/activejob/lib/active_job/queue_name.rb @@ -6,16 +6,33 @@ module ActiveJob mattr_accessor(:queue_name_prefix) mattr_accessor(:default_queue_name) { "default" } - def queue_as(part_name) + def queue_as(part_name=nil, &block) + if block_given? + self.queue_name = block + else + self.queue_name = queue_name_from_part(part_name) + end + end + + def queue_name_from_part(part_name) #:nodoc: queue_name = part_name.to_s.presence || default_queue_name name_parts = [queue_name_prefix.presence, queue_name] - self.queue_name = name_parts.compact.join('_') + name_parts.compact.join('_') end end included do - class_attribute :queue_name + class_attribute :queue_name, instance_accessor: false self.queue_name = default_queue_name end + + # Returns the name of the queue the job will be run on + def queue_name + if @queue_name.is_a?(Proc) + @queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name)) + end + @queue_name + end + end end diff --git a/activejob/lib/active_job/test_case.rb b/activejob/lib/active_job/test_case.rb new file mode 100644 index 0000000000..d894a7b5cd --- /dev/null +++ b/activejob/lib/active_job/test_case.rb @@ -0,0 +1,7 @@ +require 'active_support/test_case' + +module ActiveJob + class TestCase < ActiveSupport::TestCase + include ActiveJob::TestHelper + end +end diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb new file mode 100644 index 0000000000..fa0576669e --- /dev/null +++ b/activejob/lib/active_job/test_helper.rb @@ -0,0 +1,196 @@ +module ActiveJob + # Provides helper methods for testing Active Job + module TestHelper + extend ActiveSupport::Concern + + included do + def before_setup + @old_queue_adapter = queue_adapter + ActiveJob::Base.queue_adapter = :test + clear_enqueued_jobs + clear_performed_jobs + super + end + + def after_teardown + super + ActiveJob::Base.queue_adapter = @old_queue_adapter + end + + # Asserts that the number of enqueued jobs matches the given number. + # + # def test_jobs + # assert_enqueued_jobs 0 + # HelloJob.perform_later('david') + # assert_enqueued_jobs 1 + # HelloJob.perform_later('abdelkader') + # assert_enqueued_jobs 2 + # end + # + # If a block is passed, that block should cause the specified number of + # jobs to be enqueued. + # + # def test_jobs_again + # assert_enqueued_jobs 1 do + # HelloJob.perform_later('cristian') + # end + # + # assert_enqueued_jobs 2 do + # HelloJob.perform_later('aaron') + # HelloJob.perform_later('rafael') + # end + # end + def assert_enqueued_jobs(number) + if block_given? + original_count = enqueued_jobs.size + yield + new_count = enqueued_jobs.size + assert_equal original_count + number, new_count, + "#{number} jobs expected, but #{new_count - original_count} were enqueued" + else + enqueued_jobs_size = enqueued_jobs.size + assert_equal number, enqueued_jobs_size, "#{number} jobs expected, but #{enqueued_jobs_size} were enqueued" + end + end + + # Assert that no job have been enqueued. + # + # def test_jobs + # assert_no_enqueued_jobs + # HelloJob.perform_later('jeremy') + # assert_enqueued_jobs 1 + # end + # + # If a block is passed, that block should not cause any job to be enqueued. + # + # def test_jobs_again + # assert_no_enqueued_jobs do + # # No job should be enqueued from this block + # end + # end + # + # Note: This assertion is simply a shortcut for: + # + # assert_enqueued_jobs 0 + def assert_no_enqueued_jobs(&block) + assert_enqueued_jobs 0, &block + end + + # Asserts that the number of performed jobs matches the given number. + # + # def test_jobs + # assert_performed_jobs 0 + # HelloJob.perform_later('xavier') + # assert_performed_jobs 1 + # HelloJob.perform_later('yves') + # assert_performed_jobs 2 + # end + # + # If a block is passed, that block should cause the specified number of + # jobs to be performed. + # + # def test_jobs_again + # assert_performed_jobs 1 do + # HelloJob.perform_later('robin') + # end + # + # assert_performed_jobs 2 do + # HelloJob.perform_later('carlos') + # HelloJob.perform_later('sean') + # end + # end + def assert_performed_jobs(number) + if block_given? + original_count = performed_jobs.size + yield + new_count = performed_jobs.size + assert_equal original_count + number, new_count, + "#{number} jobs expected, but #{new_count - original_count} were performed" + else + performed_jobs_size = performed_jobs.size + assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed" + end + end + + # Asserts that no jobs have been performed. + # + # def test_jobs + # assert_no_performed_jobs + # HelloJob.perform_later('matthew') + # assert_performed_jobs 1 + # end + # + # If a block is passed, that block should not cause any job to be performed. + # + # def test_jobs_again + # assert_no_performed_jobs do + # # No job should be performed from this block + # end + # end + # + # Note: This assertion is simply a shortcut for: + # + # assert_performed_jobs 0 + def assert_no_performed_jobs(&block) + assert_performed_jobs 0, &block + end + + # Asserts that the job passed in the block has been enqueued with the given arguments. + # + # def assert_enqueued_job + # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do + # MyJob.perform_later(1,2,3) + # end + # end + def assert_enqueued_with(args = {}, &_block) + original_enqueued_jobs = enqueued_jobs.dup + clear_enqueued_jobs + args.assert_valid_keys(:job, :args, :at, :queue) + yield + matching_job = enqueued_jobs.any? do |job| + args.all? { |key, value| value == job[key] } + end + assert matching_job + ensure + queue_adapter.enqueued_jobs = original_enqueued_jobs + enqueued_jobs + end + + # Asserts that the job passed in the block has been performed with the given arguments. + # + # def test_assert_performed_with + # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do + # MyJob.perform_later(1,2,3) + # end + # end + def assert_performed_with(args = {}, &_block) + original_performed_jobs = performed_jobs.dup + clear_performed_jobs + args.assert_valid_keys(:job, :args, :at, :queue) + yield + matching_job = performed_jobs.any? do |job| + args.all? { |key, value| value == job[key] } + end + assert matching_job, "No performed job found with #{args}" + ensure + queue_adapter.performed_jobs = original_performed_jobs + performed_jobs + end + + def queue_adapter + ActiveJob::Base.queue_adapter + end + + delegate :enqueued_jobs, :enqueued_jobs=, + :performed_jobs, :performed_jobs=, + to: :queue_adapter + + private + def clear_enqueued_jobs + enqueued_jobs.clear + end + + def clear_performed_jobs + performed_jobs.clear + end + end + end +end |