From 1e237b4e44b7de564c7d6b331dd2f2243c4113fd Mon Sep 17 00:00:00 2001 From: Cristian Bica Date: Mon, 25 Aug 2014 17:34:50 +0300 Subject: Active Job refactoring --- activejob/README.md | 6 +- activejob/lib/active_job.rb | 1 + activejob/lib/active_job/base.rb | 4 +- activejob/lib/active_job/configured_job.rb | 18 ++++ activejob/lib/active_job/core.rb | 89 ++++++++++++++++++++ activejob/lib/active_job/enqueuing.rb | 98 +++++++++++----------- activejob/lib/active_job/execution.rb | 32 ++++--- activejob/lib/active_job/identifier.rb | 15 ---- activejob/lib/active_job/logging.rb | 28 ++++--- .../queue_adapters/backburner_adapter.rb | 12 +-- .../queue_adapters/delayed_job_adapter.rb | 12 +-- .../active_job/queue_adapters/inline_adapter.rb | 4 +- .../lib/active_job/queue_adapters/qu_adapter.rb | 9 +- .../lib/active_job/queue_adapters/que_adapter.rb | 12 +-- .../queue_adapters/queue_classic_adapter.rb | 12 +-- .../active_job/queue_adapters/resque_adapter.rb | 12 +-- .../active_job/queue_adapters/sidekiq_adapter.rb | 12 +-- .../active_job/queue_adapters/sneakers_adapter.rb | 10 +-- .../queue_adapters/sucker_punch_adapter.rb | 10 +-- .../lib/active_job/queue_adapters/test_adapter.rb | 16 ++-- activejob/lib/active_job/queue_name.rb | 23 ++++- activejob/test/cases/callbacks_test.rb | 5 +- activejob/test/cases/job_serialization_test.rb | 2 +- activejob/test/cases/logging_test.rb | 41 +++++---- activejob/test/cases/queue_naming_test.rb | 37 ++++++-- activejob/test/cases/queuing_test.rb | 14 ++-- activejob/test/cases/rescue_test.rb | 12 +-- activejob/test/cases/test_helper_test.rb | 66 +++++++-------- activejob/test/jobs/nested_job.rb | 2 +- activejob/test/jobs/rescue_job.rb | 2 +- 30 files changed, 387 insertions(+), 229 deletions(-) create mode 100644 activejob/lib/active_job/configured_job.rb create mode 100644 activejob/lib/active_job/core.rb delete mode 100644 activejob/lib/active_job/identifier.rb (limited to 'activejob') diff --git a/activejob/README.md b/activejob/README.md index 1f300fcf62..5734ca413e 100644 --- a/activejob/README.md +++ b/activejob/README.md @@ -43,15 +43,15 @@ end Enqueue a job like so: ```ruby -MyJob.enqueue record # Enqueue a job to be performed as soon the queueing system is free. +MyJob.perform_later record # Enqueue a job to be performed as soon the queueing system is free. ``` ```ruby -MyJob.enqueue_at Date.tomorrow.noon, record # Enqueue a job to be performed tomorrow at noon. +MyJob.set(wait_until: Date.tomorrow.noon).perform_later(record) # Enqueue a job to be performed tomorrow at noon. ``` ```ruby -MyJob.enqueue_in 1.week, record # Enqueue a job to be performed 1 week from now. +MyJob.set(wait: 1.week).perform_later(record) # Enqueue a job to be performed 1 week from now. ``` That's it! diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb index 93d9d1b2d8..1b582f5877 100644 --- a/activejob/lib/active_job.rb +++ b/activejob/lib/active_job.rb @@ -31,6 +31,7 @@ module ActiveJob autoload :Base autoload :QueueAdapters + autoload :ConfiguredJob autoload :TestCase autoload :TestHelper end diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb index 1b54786303..a3bec1f827 100644 --- a/activejob/lib/active_job/base.rb +++ b/activejob/lib/active_job/base.rb @@ -1,19 +1,19 @@ +require 'active_job/core' require 'active_job/queue_adapter' require 'active_job/queue_name' require 'active_job/enqueuing' require 'active_job/execution' require 'active_job/callbacks' -require 'active_job/identifier' require 'active_job/logging' module ActiveJob class Base + include Core include QueueAdapter include QueueName include Enqueuing include Execution include Callbacks - include Identifier include Logging ActiveSupport.run_load_hooks(:active_job, self) diff --git a/activejob/lib/active_job/configured_job.rb b/activejob/lib/active_job/configured_job.rb new file mode 100644 index 0000000000..61efc9b09e --- /dev/null +++ b/activejob/lib/active_job/configured_job.rb @@ -0,0 +1,18 @@ +module ActiveJob + class ConfiguredJob #:nodoc: + def initialize(job_class, options={}) + @options = options + @options[:in] = @options.delete(:wait) if @options[:wait] + @options[:at] = @options.delete(:wait_until) if @options[:wait_until] + @job_class = job_class + end + + def perform_now(*args) + @job_class.new(*args).perform_now + end + + def perform_later(*args) + @job_class.new(*args).enqueue @options + end + end +end diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb new file mode 100644 index 0000000000..b6dd03a0bc --- /dev/null +++ b/activejob/lib/active_job/core.rb @@ -0,0 +1,89 @@ +module ActiveJob + module Core + extend ActiveSupport::Concern + + included do + # Job arguments + attr_accessor :arguments + attr_writer :serialized_arguments + + # Timestamp when the job should be performed + attr_accessor :scheduled_at + + # Job Identifier + attr_accessor :job_id + + # Queue on which the job should be run on. + attr_writer :queue_name + end + + module ClassMethods + # Creates a new job instance from a hash created with +serialize+ + def deserialize(job_data) + job = job_data['job_class'].constantize.new + job.job_id = job_data['job_id'] + job.queue_name = job_data['queue_name'] + job.serialized_arguments = job_data['arguments'] + job + end + + # Creates a job preconfigured with the given options. You can call + # perform_later with the job arguments to enqueue the job with the + # preconfigured options + # + # ==== Options + # * :wait - Enqueues the job with the specified delay + # * :wait_until - Enqueues the job at the time specified + # * :queue - Enqueues the job on the specified queue + # + # ==== Examples + # + # VideoJob.set(queue: :some_queue).perform_later(Video.last) + # VideoJob.set(wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(wait_until: Time.tomorroe).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait_until: Time.tomorroe).perform_later(Video.last) + def set(options={}) + ConfiguredJob.new(self, options) + end + end + + # Creates a new job instance. Takes as arguments the arguments that + # will be passed to the perform method. + def initialize(*arguments) + @arguments = arguments + @job_id = SecureRandom.uuid + @queue_name = self.class.queue_name + end + + # Returns a hash with the job data that can safely be passed to the + # queueing adapter. + def serialize + { + 'job_class' => self.class.name, + 'job_id' => job_id, + 'queue_name' => queue_name, + 'arguments' => serialize_arguments(arguments) + } + end + + private + def deserialize_arguments_if_needed + if defined?(@serialized_arguments) && @serialized_arguments.present? + @arguments = deserialize_arguments(@serialized_arguments) + @serialized_arguments = nil + end + end + + def serialize_arguments(serialized_args) + Arguments.serialize(serialized_args) + end + + def deserialize_arguments(serialized_args) + Arguments.deserialize(serialized_args) + end + end +end + + + diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 3d00d51867..32453930cc 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -12,60 +12,64 @@ module ActiveJob # # Returns an instance of the job class queued with args available in # Job#arguments. - def enqueue(*args) - new(args).tap do |job| - job.run_callbacks :enqueue do - queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args) - end - end - end - - # Enqueue a job to be performed at +interval+ from now. - # - # enqueue_in(1.week, "mike") - # - # Returns an instance of the job class queued with args available in - # Job#arguments and the timestamp in Job#enqueue_at. - def enqueue_in(interval, *args) - enqueue_at interval.seconds.from_now, *args + def perform_later(*args) + job_or_instantiate(*args).enqueue end - # Enqueue a job to be performed at an explicit point in time. - # - # enqueue_at(Date.tomorrow.midnight, "mike") - # - # Returns an instance of the job class queued with args available in - # Job#arguments and the timestamp in Job#enqueue_at. - def enqueue_at(timestamp, *args) - new(args).tap do |job| - job.enqueued_at = timestamp - - job.run_callbacks :enqueue do - queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args) - end + protected + def job_or_instantiate(*args) + args.first.is_a?(self) ? args.first : new(*args) end - end - end - - included do - attr_accessor :arguments - attr_accessor :enqueued_at - end - - def initialize(arguments = nil) - @arguments = arguments end - def retry_now - self.class.enqueue(*arguments) + # Reschedule the job to be re-executed. This is usefull in combination + # with the +rescue_from+ option. When you rescue an exception from your job + # you can ask Active Job to retry performing your job. + # + # ==== Options + # * :in - Enqueues the job with the specified delay + # * :at - Enqueues the job at the time specified + # * :queue - Enqueues the job on the specified queue + # + # ==== Examples + # + # class SiteScrapperJob < ActiveJob::Base + # rescue_from(ErrorLoadingSite) do + # retry_job queue: :low_priority + # end + # def perform(*args) + # # raise ErrorLoadingSite if cannot scrape + # end + # end + def retry_job(options={}) + enqueue options end - def retry_in(interval) - self.class.enqueue_in interval, *arguments - end - - def retry_at(timestamp) - self.class.enqueue_at timestamp, *arguments + # Equeue the job to be performed by the queue adapter. + # + # ==== Options + # * :in - Enqueues the job with the specified delay + # * :at - Enqueues the job at the time specified + # * :queue - Enqueues the job on the specified queue + # + # ==== Examples + # + # my_job_instance.enqueue + # my_job_instance.enqueue in: 5.minutes + # my_job_instance.enqueue queue: :important + # my_job_instance.enqueue at: Date.tomorrow.midnight + def enqueue(options={}) + self.scheduled_at = options[:in].seconds.from_now.to_f if options[:in] + self.scheduled_at = options[:at].to_f if options[:at] + self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] + run_callbacks :enqueue do + if self.scheduled_at + self.class.queue_adapter.enqueue_at self, self.scheduled_at + else + self.class.queue_adapter.enqueue self + end + end + self end end end diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb index 0e7b5bdd72..d6d67c46e3 100644 --- a/activejob/lib/active_job/execution.rb +++ b/activejob/lib/active_job/execution.rb @@ -4,15 +4,29 @@ require 'active_job/arguments' module ActiveJob module Execution extend ActiveSupport::Concern + include ActiveSupport::Rescuable - included do - include ActiveSupport::Rescuable - end + module ClassMethods + # Performs the job immediately. + # + # MyJob.perform_now("mike") + # + def perform_now(*args) + job_or_instantiate(*args).perform_now + end - def execute(job_id, *serialized_args) - self.job_id = job_id - self.arguments = deserialize_arguments(serialized_args) + def execute(job_data) #:nodoc: + job = deserialize(job_data) + job.perform_now + end + end + # Performs the job immediately. The job is not sent to the queueing adapter + # and will block the execution until it's finished. + # + # MyJob.new(*args).perform_now + def perform_now + deserialize_arguments_if_needed run_callbacks :perform do perform(*arguments) end @@ -23,11 +37,5 @@ module ActiveJob def perform(*) fail NotImplementedError end - - private - def deserialize_arguments(serialized_args) - Arguments.deserialize(serialized_args) - end - end end diff --git a/activejob/lib/active_job/identifier.rb b/activejob/lib/active_job/identifier.rb deleted file mode 100644 index c7f522087d..0000000000 --- a/activejob/lib/active_job/identifier.rb +++ /dev/null @@ -1,15 +0,0 @@ -require 'active_job/arguments' - -module ActiveJob - module Identifier - extend ActiveSupport::Concern - - included do - attr_writer :job_id - end - - def job_id - @job_id ||= SecureRandom.uuid - end - end -end diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb index ae098a80f3..d2030773bb 100644 --- a/activejob/lib/active_job/logging.rb +++ b/activejob/lib/active_job/logging.rb @@ -17,7 +17,7 @@ module ActiveJob around_perform do |job, block, _| tag_logger(job.class.name, job.job_id) do - payload = {adapter: job.class.queue_adapter, job: job.class, args: job.arguments} + payload = {adapter: job.class.queue_adapter, job: job} ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup) ActiveSupport::Notifications.instrument("perform.active_job", payload) do block.call @@ -26,12 +26,12 @@ module ActiveJob end before_enqueue do |job| - if job.enqueued_at + if job.scheduled_at ActiveSupport::Notifications.instrument "enqueue_at.active_job", - adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at + adapter: job.class.queue_adapter, job: job else ActiveSupport::Notifications.instrument "enqueue.active_job", - adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments + adapter: job.class.queue_adapter, job: job end end end @@ -52,19 +52,23 @@ module ActiveJob class LogSubscriber < ActiveSupport::LogSubscriber def enqueue(event) - info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event) } + job = event.payload[:job] + info { "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job) } end def enqueue_at(event) - info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) } + job = event.payload[:job] + info { "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job) } end def perform_start(event) - info { "Performing #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) } + job = event.payload[:job] + info { "Performing #{job.class.name} from #{queue_name(event)}" + args_info(job) } end def perform(event) - info { "Performed #{event.payload[:job].name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" } + job = event.payload[:job] + info { "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" } end private @@ -72,12 +76,12 @@ module ActiveJob event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})" end - def args_info(event) - event.payload[:args].any? ? " with arguments: #{event.payload[:args].map(&:inspect).join(", ")}" : "" + def args_info(job) + job.arguments.any? ? " with arguments: #{job.arguments.map(&:inspect).join(", ")}" : "" end - def enqueued_at(event) - Time.at(event.payload[:timestamp]).utc + def scheduled_at(event) + Time.at(event.payload[:job].scheduled_at).utc end def logger diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb index 8ebee36e45..e1b00f1de7 100644 --- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb @@ -4,20 +4,20 @@ module ActiveJob module QueueAdapters class BackburnerAdapter class << self - def enqueue(job, *args) - Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name + def enqueue(job) + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) delay = timestamp - Time.current.to_f - Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name, delay: delay + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay end end class JobWrapper class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb index a00569833a..658799edfc 100644 --- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class DelayedJobAdapter class << self - def enqueue(job, *args) - JobWrapper.new.delay(queue: job.queue_name).perform(job, *args) + def enqueue(job) + JobWrapper.new.delay(queue: job.queue_name).perform(job.serialize) end - def enqueue_at(job, timestamp, *args) - JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job, *args) + def enqueue_at(job, timestamp) + JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job.serialize) end end class JobWrapper - def perform(job, *args) - job.new.execute(*args) + def perform(job_data) + Base.execute(job_data) end end end diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb index 5805340fb0..fdefa38d5e 100644 --- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb @@ -2,8 +2,8 @@ module ActiveJob module QueueAdapters class InlineAdapter class << self - def enqueue(job, *args) - job.new.execute(*args) + def enqueue(job) + Base.execute(job.serialize) end def enqueue_at(*) diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb index 5cb741c094..f681fd7e8a 100644 --- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb @@ -5,7 +5,7 @@ module ActiveJob class QuAdapter class << self def enqueue(job, *args) - Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload| + Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload| payload.instance_variable_set(:@queue, job.queue_name) end.push end @@ -16,13 +16,12 @@ module ActiveJob end class JobWrapper < Qu::Job - def initialize(job_name, *args) - @job = job_name.constantize - @args = args + def initialize(job_data) + @job_data = job_data end def perform - @job.new.execute(*@args) + Base.execute @job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb index 9c84c74f83..51891ab07b 100644 --- a/activejob/lib/active_job/queue_adapters/que_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class QueAdapter class << self - def enqueue(job, *args) - JobWrapper.enqueue job.name, *args, queue: job.queue_name + def enqueue(job) + JobWrapper.enqueue job.serialize, queue: job.queue_name end - def enqueue_at(job, timestamp, *args) - JobWrapper.enqueue job.name, *args, queue: job.queue_name, run_at: Time.at(timestamp) + def enqueue_at(job, timestamp) + JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp) end end class JobWrapper < Que::Job - def run(job_name, *args) - job_name.constantize.new.execute(*args) + def run(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb index 914390a958..ddcc868317 100644 --- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class QueueClassicAdapter class << self - def enqueue(job, *args) - build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args) + def enqueue(job) + build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) queue = build_queue(job.queue_name) unless queue.respond_to?(:enqueue_at) raise NotImplementedError, 'To be able to schedule jobs with Queue Classic ' \ 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' 'You can implement this yourself or you can use the queue_classic-later gem.' end - queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.name, *args) + queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) end # Builds a QC::Queue object to schedule jobs on. @@ -30,8 +30,8 @@ module ActiveJob class JobWrapper class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb index da8212fc9b..affa3bdfbc 100644 --- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb @@ -16,23 +16,23 @@ module ActiveJob module QueueAdapters class ResqueAdapter class << self - def enqueue(job, *args) - Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args + def enqueue(job) + Resque.enqueue_to job.queue_name, JobWrapper, job.serialize end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) unless Resque.respond_to?(:enqueue_at_with_queue) raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \ "resque-scheduler gem. Please add it to your Gemfile and run bundle install" end - Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args + Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize end end class JobWrapper class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb index 3e20bec44c..79926a1e24 100644 --- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -4,20 +4,20 @@ module ActiveJob module QueueAdapters class SidekiqAdapter class << self - def enqueue(job, *args) + def enqueue(job) #Sidekiq::Client does not support symbols as keys Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, - 'args' => [ job, *args ], + 'args' => [ job.serialize ], 'retry' => true end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, - 'args' => [ job, *args ], + 'args' => [ job.serialize ], 'retry' => true, 'at' => timestamp end @@ -26,8 +26,8 @@ module ActiveJob class JobWrapper include Sidekiq::Worker - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb index 48b3df6a46..60633639f4 100644 --- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -7,14 +7,14 @@ module ActiveJob @monitor = Monitor.new class << self - def enqueue(job, *args) + def enqueue(job) @monitor.synchronize do JobWrapper.from_queue job.queue_name - JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ]) + JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize) end end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) raise NotImplementedError end end @@ -24,8 +24,8 @@ module ActiveJob from_queue 'active_jobs_default' def work(msg) - job_name, *args = ActiveSupport::JSON.decode(msg) - job_name.constantize.new.execute(*args) + job_data = ActiveSupport::JSON.decode(msg) + Base.execute job_data ack! end end diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb index 16f05744f3..b19a38093f 100644 --- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -4,11 +4,11 @@ module ActiveJob module QueueAdapters class SuckerPunchAdapter class << self - def enqueue(job, *args) - JobWrapper.new.async.perform job, *args + def enqueue(job) + JobWrapper.new.async.perform job.serialize end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) raise NotImplementedError end end @@ -16,8 +16,8 @@ module ActiveJob class JobWrapper include SuckerPunch::Job - def perform(job, *args) - job.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb index 185d6fc7e6..b9997efddf 100644 --- a/activejob/lib/active_job/queue_adapters/test_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb @@ -45,21 +45,21 @@ module ActiveJob @performed_jobs = val end - def enqueue(job, *args) + def enqueue(job) if perform_enqueued_jobs? - performed_jobs << {job: job, args: args, queue: job.queue_name} - job.new.execute(*args) + performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name} + job.perform_now else - enqueued_jobs << {job: job, args: args, queue: job.queue_name} + enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name} end end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) if perform_enqueued_at_jobs? - performed_jobs << {job: job, args: args, queue: job.queue_name, run_at: timestamp} - job.new.execute(*args) + performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp} + job.perform_now else - enqueued_jobs << {job: job, args: args, queue: job.queue_name, run_at: timestamp} + enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp} end end diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb index 9698835b6e..45acb71605 100644 --- a/activejob/lib/active_job/queue_name.rb +++ b/activejob/lib/active_job/queue_name.rb @@ -6,16 +6,33 @@ module ActiveJob mattr_accessor(:queue_name_prefix) mattr_accessor(:default_queue_name) { "default" } - def queue_as(part_name) + def queue_as(part_name=nil, &block) + if block_given? + self.queue_name = block + else + self.queue_name = queue_name_from_part(part_name) + end + end + + def queue_name_from_part(part_name) #:nodoc: queue_name = part_name.to_s.presence || default_queue_name name_parts = [queue_name_prefix.presence, queue_name] - self.queue_name = name_parts.compact.join('_') + name_parts.compact.join('_') end end included do - class_attribute :queue_name + class_attribute :queue_name, instance_accessor: false self.queue_name = default_queue_name end + + # Returns the name of the queue the job will be run on + def queue_name + if @queue_name.is_a?(Proc) + @queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name)) + end + @queue_name + end + end end diff --git a/activejob/test/cases/callbacks_test.rb b/activejob/test/cases/callbacks_test.rb index 9a0657ee89..9af2380767 100644 --- a/activejob/test/cases/callbacks_test.rb +++ b/activejob/test/cases/callbacks_test.rb @@ -5,7 +5,8 @@ require 'active_support/core_ext/object/inclusion' class CallbacksTest < ActiveSupport::TestCase test 'perform callbacks' do - performed_callback_job = CallbackJob.new.tap { |j| j.execute("A-JOB-ID") } + performed_callback_job = CallbackJob.new("A-JOB-ID") + performed_callback_job.perform_now assert "CallbackJob ran before_perform".in? performed_callback_job.history assert "CallbackJob ran after_perform".in? performed_callback_job.history assert "CallbackJob ran around_perform_start".in? performed_callback_job.history @@ -13,7 +14,7 @@ class CallbacksTest < ActiveSupport::TestCase end test 'enqueue callbacks' do - enqueued_callback_job = CallbackJob.enqueue + enqueued_callback_job = CallbackJob.perform_later assert "CallbackJob ran before_enqueue".in? enqueued_callback_job.history assert "CallbackJob ran after_enqueue".in? enqueued_callback_job.history assert "CallbackJob ran around_enqueue_start".in? enqueued_callback_job.history diff --git a/activejob/test/cases/job_serialization_test.rb b/activejob/test/cases/job_serialization_test.rb index fc1b77744c..db22783030 100644 --- a/activejob/test/cases/job_serialization_test.rb +++ b/activejob/test/cases/job_serialization_test.rb @@ -9,7 +9,7 @@ class JobSerializationTest < ActiveSupport::TestCase end test 'serialize job with gid' do - GidJob.enqueue @person + GidJob.perform_later @person assert_equal "Person with ID: 5", JobBuffer.last_value end end diff --git a/activejob/test/cases/logging_test.rb b/activejob/test/cases/logging_test.rb index 888c183a0b..9c56ee08b6 100644 --- a/activejob/test/cases/logging_test.rb +++ b/activejob/test/cases/logging_test.rb @@ -42,34 +42,43 @@ class AdapterTest < ActiveSupport::TestCase def test_uses_active_job_as_tag - HelloJob.enqueue "Cristian" + HelloJob.perform_later "Cristian" assert_match(/\[ActiveJob\]/, @logger.messages) end + def test_uses_job_name_as_tag + LoggingJob.perform_later "Dummy" + assert_match(/\[LoggingJob\]/, @logger.messages) + end + + def test_uses_job_id_as_tag + LoggingJob.perform_later "Dummy" + assert_match(/\[LOGGING-JOB-ID\]/, @logger.messages) + end + + def test_logs_correct_queue_name + original_queue_name = LoggingJob.queue_name + LoggingJob.queue_as :php_jobs + LoggingJob.perform_later("Dummy") + assert_match(/to .*?\(php_jobs\).*/, @logger.messages) + ensure + LoggingJob.queue_name = original_queue_name + end + def test_enqueue_job_logging - HelloJob.enqueue "Cristian" + HelloJob.perform_later "Cristian" assert_match(/Enqueued HelloJob \(Job ID: .*?\) to .*?:.*Cristian/, @logger.messages) end def test_perform_job_logging - LoggingJob.enqueue "Dummy" + LoggingJob.perform_later "Dummy" assert_match(/Performing LoggingJob from .*? with arguments:.*Dummy/, @logger.messages) assert_match(/Dummy, here is it: Dummy/, @logger.messages) assert_match(/Performed LoggingJob from .*? in .*ms/, @logger.messages) end - def test_perform_uses_job_name_job_logging - LoggingJob.enqueue "Dummy" - assert_match(/\[LoggingJob\]/, @logger.messages) - end - - def test_perform_uses_job_id_job_logging - LoggingJob.enqueue "Dummy" - assert_match(/\[LOGGING-JOB-ID\]/, @logger.messages) - end - def test_perform_nested_jobs_logging - NestedJob.enqueue + NestedJob.perform_later assert_match(/\[LoggingJob\] \[.*?\]/, @logger.messages) assert_match(/\[ActiveJob\] Enqueued NestedJob \(Job ID: .*\) to/, @logger.messages) assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Performing NestedJob from/, @logger.messages) @@ -81,14 +90,14 @@ class AdapterTest < ActiveSupport::TestCase end def test_enqueue_at_job_logging - HelloJob.enqueue_at 1, "Cristian" + HelloJob.set(wait_until: 24.hours.from_now).perform_later "Cristian" assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages) rescue NotImplementedError skip end def test_enqueue_in_job_logging - HelloJob.enqueue_in 2, "Cristian" + HelloJob.set(wait: 2.seconds).perform_later "Cristian" assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages) rescue NotImplementedError skip diff --git a/activejob/test/cases/queue_naming_test.rb b/activejob/test/cases/queue_naming_test.rb index fdfd1afceb..4052477543 100644 --- a/activejob/test/cases/queue_naming_test.rb +++ b/activejob/test/cases/queue_naming_test.rb @@ -8,20 +8,38 @@ class QueueNamingTest < ActiveSupport::TestCase assert_equal "default", HelloJob.queue_name end - test 'name appended in job' do + test 'uses given queue name job' do begin + original_queue_name = HelloJob.queue_name HelloJob.queue_as :greetings - LoggingJob.queue_as :bookkeeping + assert_equal "greetings", HelloJob.new.queue_name + ensure + HelloJob.queue_name = original_queue_name + end + end - assert_equal "default", NestedJob.queue_name - assert_equal "greetings", HelloJob.queue_name - assert_equal "bookkeeping", LoggingJob.queue_name + test 'evals block given to queue_as to determine queue' do + begin + original_queue_name = HelloJob.queue_name + HelloJob.queue_as { :another } + assert_equal "another", HelloJob.new.queue_name ensure - HelloJob.queue_name = LoggingJob.queue_name = ActiveJob::Base.default_queue_name + HelloJob.queue_name = original_queue_name end end - test 'should prefix the queue name' do + test 'can use arguments to determine queue_name in queue_as block' do + begin + original_queue_name = HelloJob.queue_name + HelloJob.queue_as { self.arguments.first=='1' ? :one : :two } + assert_equal "one", HelloJob.new('1').queue_name + assert_equal "two", HelloJob.new('3').queue_name + ensure + HelloJob.queue_name = original_queue_name + end + end + + test 'queu_name_prefix prepended to the queue name' do begin original_queue_name_prefix = ActiveJob::Base.queue_name_prefix original_queue_name = HelloJob.queue_name @@ -35,4 +53,9 @@ class QueueNamingTest < ActiveSupport::TestCase end end + test 'uses queue passed to #set' do + job = HelloJob.set(queue: :some_queue).perform_later + assert_equal "some_queue", job.queue_name + end + end diff --git a/activejob/test/cases/queuing_test.rb b/activejob/test/cases/queuing_test.rb index f020316d7e..0eeabbf693 100644 --- a/activejob/test/cases/queuing_test.rb +++ b/activejob/test/cases/queuing_test.rb @@ -9,18 +9,18 @@ class QueuingTest < ActiveSupport::TestCase end test 'run queued job' do - HelloJob.enqueue + HelloJob.perform_later assert_equal "David says hello", JobBuffer.last_value end test 'run queued job with arguments' do - HelloJob.enqueue "Jamie" + HelloJob.perform_later "Jamie" assert_equal "Jamie says hello", JobBuffer.last_value end test 'run queued job later' do begin - result = HelloJob.enqueue_at 1.second.ago, "Jamie" + result = HelloJob.set(wait_until: 1.second.ago).perform_later "Jamie" assert result rescue NotImplementedError skip @@ -28,15 +28,15 @@ class QueuingTest < ActiveSupport::TestCase end test 'job returned by enqueue has the arguments available' do - job = HelloJob.enqueue "Jamie" + job = HelloJob.perform_later "Jamie" assert_equal [ "Jamie" ], job.arguments end - test 'job returned by enqueue_at has the timestamp available' do + test 'job returned by perform_at has the timestamp available' do begin - job = HelloJob.enqueue_at Time.utc(2014, 1, 1) - assert_equal Time.utc(2014, 1, 1), job.enqueued_at + job = HelloJob.set(wait_until: Time.utc(2014, 1, 1)).perform_later + assert_equal Time.utc(2014, 1, 1).to_f, job.scheduled_at rescue NotImplementedError skip end diff --git a/activejob/test/cases/rescue_test.rb b/activejob/test/cases/rescue_test.rb index 3af147383e..1b6c2e9fac 100644 --- a/activejob/test/cases/rescue_test.rb +++ b/activejob/test/cases/rescue_test.rb @@ -10,27 +10,27 @@ class RescueTest < ActiveSupport::TestCase end test 'rescue perform exception with retry' do - job = RescueJob.new - job.execute(SecureRandom.uuid, "david") + job = RescueJob.new("david") + job.perform_now assert_equal [ "rescued from ArgumentError", "performed beautifully" ], JobBuffer.values end test 'let through unhandled perform exception' do - job = RescueJob.new + job = RescueJob.new("other") assert_raises(RescueJob::OtherError) do - job.execute(SecureRandom.uuid, "other") + job.perform_now end end test 'rescue from deserialization errors' do - RescueJob.enqueue Person.new(404) + RescueJob.perform_later Person.new(404) assert_includes JobBuffer.values, 'rescued from DeserializationError' assert_includes JobBuffer.values, 'DeserializationError original exception was Person::RecordNotFound' assert_not_includes JobBuffer.values, 'performed beautifully' end test "should not wrap DeserializationError in DeserializationError" do - RescueJob.enqueue [Person.new(404)] + RescueJob.perform_later [Person.new(404)] assert_includes JobBuffer.values, 'DeserializationError original exception was Person::RecordNotFound' end end diff --git a/activejob/test/cases/test_helper_test.rb b/activejob/test/cases/test_helper_test.rb index 240aa23ce3..eab540bb6c 100644 --- a/activejob/test/cases/test_helper_test.rb +++ b/activejob/test/cases/test_helper_test.rb @@ -11,7 +11,7 @@ class EnqueuedJobsTest < ActiveJob::TestCase def test_assert_enqueued_jobs assert_nothing_raised do assert_enqueued_jobs 1 do - HelloJob.enqueue('david') + HelloJob.perform_later('david') end end end @@ -19,27 +19,27 @@ class EnqueuedJobsTest < ActiveJob::TestCase def test_repeated_enqueued_jobs_calls assert_nothing_raised do assert_enqueued_jobs 1 do - HelloJob.enqueue('abdelkader') + HelloJob.perform_later('abdelkader') end end assert_nothing_raised do assert_enqueued_jobs 2 do - HelloJob.enqueue('sean') - HelloJob.enqueue('yves') + HelloJob.perform_later('sean') + HelloJob.perform_later('yves') end end end def test_assert_enqueued_jobs_with_no_block assert_nothing_raised do - HelloJob.enqueue('rafael') + HelloJob.perform_later('rafael') assert_enqueued_jobs 1 end assert_nothing_raised do - HelloJob.enqueue('aaron') - HelloJob.enqueue('matthew') + HelloJob.perform_later('aaron') + HelloJob.perform_later('matthew') assert_enqueued_jobs 3 end end @@ -48,7 +48,7 @@ class EnqueuedJobsTest < ActiveJob::TestCase assert_nothing_raised do assert_no_enqueued_jobs do # Scheduled jobs are being performed in this context - HelloJob.enqueue_at(Date.tomorrow.noon, 'godfrey') + HelloJob.set(wait_until: Date.tomorrow.noon).perform_later('godfrey') end end end @@ -56,7 +56,7 @@ class EnqueuedJobsTest < ActiveJob::TestCase def test_assert_enqueued_jobs_too_few_sent error = assert_raise ActiveSupport::TestCase::Assertion do assert_enqueued_jobs 2 do - HelloJob.enqueue('xavier') + HelloJob.perform_later('xavier') end end @@ -66,8 +66,8 @@ class EnqueuedJobsTest < ActiveJob::TestCase def test_assert_enqueued_jobs_too_many_sent error = assert_raise ActiveSupport::TestCase::Assertion do assert_enqueued_jobs 1 do - HelloJob.enqueue('cristian') - HelloJob.enqueue('guillermo') + HelloJob.perform_later('cristian') + HelloJob.perform_later('guillermo') end end @@ -77,7 +77,7 @@ class EnqueuedJobsTest < ActiveJob::TestCase def test_assert_no_enqueued_jobs_failure error = assert_raise ActiveSupport::TestCase::Assertion do assert_no_enqueued_jobs do - HelloJob.enqueue('jeremy') + HelloJob.perform_later('jeremy') end end @@ -86,20 +86,20 @@ class EnqueuedJobsTest < ActiveJob::TestCase def test_assert_enqueued_job assert_enqueued_with(job: LoggingJob, queue: 'default') do - NestedJob.enqueue_at(Date.tomorrow.noon) + NestedJob.set(wait_until: Date.tomorrow.noon).perform_later end end def test_assert_enqueued_job_failure assert_raise ActiveSupport::TestCase::Assertion do assert_enqueued_with(job: LoggingJob, queue: 'default') do - NestedJob.enqueue + NestedJob.perform_later end end assert_raise ActiveSupport::TestCase::Assertion do assert_enqueued_with(job: NestedJob, queue: 'low') do - NestedJob.enqueue + NestedJob.perform_later end end end @@ -107,7 +107,7 @@ class EnqueuedJobsTest < ActiveJob::TestCase def test_assert_enqueued_job_args assert_raise ArgumentError do assert_enqueued_with(class: LoggingJob) do - NestedJob.enqueue_at(Date.tomorrow.noon) + NestedJob.set(wait_until: Date.tomorrow.noon).perform_later end end end @@ -119,7 +119,7 @@ class PerformedJobsTest < ActiveJob::TestCase def test_assert_performed_jobs assert_nothing_raised do assert_performed_jobs 1 do - HelloJob.enqueue('david') + HelloJob.perform_later('david') end end end @@ -127,27 +127,27 @@ class PerformedJobsTest < ActiveJob::TestCase def test_repeated_performed_jobs_calls assert_nothing_raised do assert_performed_jobs 1 do - HelloJob.enqueue('abdelkader') + HelloJob.perform_later('abdelkader') end end assert_nothing_raised do assert_performed_jobs 2 do - HelloJob.enqueue('sean') - HelloJob.enqueue('yves') + HelloJob.perform_later('sean') + HelloJob.perform_later('yves') end end end def test_assert_performed_jobs_with_no_block assert_nothing_raised do - HelloJob.enqueue('rafael') + HelloJob.perform_later('rafael') assert_performed_jobs 1 end assert_nothing_raised do - HelloJob.enqueue('aaron') - HelloJob.enqueue('matthew') + HelloJob.perform_later('aaron') + HelloJob.perform_later('matthew') assert_performed_jobs 3 end end @@ -156,7 +156,7 @@ class PerformedJobsTest < ActiveJob::TestCase assert_nothing_raised do assert_no_performed_jobs do # Scheduled jobs are being enqueued in this context - HelloJob.enqueue_at(Date.tomorrow.noon, 'godfrey') + HelloJob.set(wait_until: Date.tomorrow.noon).perform_later('godfrey') end end end @@ -164,7 +164,7 @@ class PerformedJobsTest < ActiveJob::TestCase def test_assert_performed_jobs_too_few_sent error = assert_raise ActiveSupport::TestCase::Assertion do assert_performed_jobs 2 do - HelloJob.enqueue('xavier') + HelloJob.perform_later('xavier') end end @@ -174,8 +174,8 @@ class PerformedJobsTest < ActiveJob::TestCase def test_assert_performed_jobs_too_many_sent error = assert_raise ActiveSupport::TestCase::Assertion do assert_performed_jobs 1 do - HelloJob.enqueue('cristian') - HelloJob.enqueue('guillermo') + HelloJob.perform_later('cristian') + HelloJob.perform_later('guillermo') end end @@ -185,7 +185,7 @@ class PerformedJobsTest < ActiveJob::TestCase def test_assert_no_performed_jobs_failure error = assert_raise ActiveSupport::TestCase::Assertion do assert_no_performed_jobs do - HelloJob.enqueue('jeremy') + HelloJob.perform_later('jeremy') end end @@ -194,20 +194,20 @@ class PerformedJobsTest < ActiveJob::TestCase def test_assert_performed_job assert_performed_with(job: NestedJob, queue: 'default') do - NestedJob.enqueue + NestedJob.perform_later end end def test_assert_performed_job_failure assert_raise ActiveSupport::TestCase::Assertion do - assert_performed_with(job: LoggingJob, queue: 'default') do - NestedJob.enqueue_at(Date.tomorrow.noon) + assert_performed_with(job: LoggingJob, at: Date.tomorrow.noon, queue: 'default') do + NestedJob.set(wait_until: Date.tomorrow.noon).perform_later end end assert_raise ActiveSupport::TestCase::Assertion do - assert_performed_with(job: NestedJob, queue: 'low') do - NestedJob.enqueue_at(Date.tomorrow.noon) + assert_performed_with(job: NestedJob, at: Date.tomorrow.noon, queue: 'low') do + NestedJob.set(queue: 'low', wait_until: Date.tomorrow.noon).perform_later end end end diff --git a/activejob/test/jobs/nested_job.rb b/activejob/test/jobs/nested_job.rb index fd66f68991..8c4ec549a6 100644 --- a/activejob/test/jobs/nested_job.rb +++ b/activejob/test/jobs/nested_job.rb @@ -1,6 +1,6 @@ class NestedJob < ActiveJob::Base def perform - LoggingJob.enqueue "NestedJob" + LoggingJob.perform_later "NestedJob" end def job_id diff --git a/activejob/test/jobs/rescue_job.rb b/activejob/test/jobs/rescue_job.rb index 6b6e74e9d0..f1b9c9349e 100644 --- a/activejob/test/jobs/rescue_job.rb +++ b/activejob/test/jobs/rescue_job.rb @@ -6,7 +6,7 @@ class RescueJob < ActiveJob::Base rescue_from(ArgumentError) do JobBuffer.add('rescued from ArgumentError') arguments[0] = "DIFFERENT!" - retry_now + retry_job end rescue_from(ActiveJob::DeserializationError) do |e| -- cgit v1.2.3