diff options
Diffstat (limited to 'activejob/lib')
20 files changed, 275 insertions, 152 deletions
diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb index 93d9d1b2d8..1b582f5877 100644 --- a/activejob/lib/active_job.rb +++ b/activejob/lib/active_job.rb @@ -31,6 +31,7 @@ module ActiveJob autoload :Base autoload :QueueAdapters + autoload :ConfiguredJob autoload :TestCase autoload :TestHelper end diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb index 1b54786303..a3bec1f827 100644 --- a/activejob/lib/active_job/base.rb +++ b/activejob/lib/active_job/base.rb @@ -1,19 +1,19 @@ +require 'active_job/core' require 'active_job/queue_adapter' require 'active_job/queue_name' require 'active_job/enqueuing' require 'active_job/execution' require 'active_job/callbacks' -require 'active_job/identifier' require 'active_job/logging' module ActiveJob class Base + include Core include QueueAdapter include QueueName include Enqueuing include Execution include Callbacks - include Identifier include Logging ActiveSupport.run_load_hooks(:active_job, self) diff --git a/activejob/lib/active_job/configured_job.rb b/activejob/lib/active_job/configured_job.rb new file mode 100644 index 0000000000..979280b910 --- /dev/null +++ b/activejob/lib/active_job/configured_job.rb @@ -0,0 +1,16 @@ +module ActiveJob + class ConfiguredJob #:nodoc: + def initialize(job_class, options={}) + @options = options + @job_class = job_class + end + + def perform_now(*args) + @job_class.new(*args).perform_now + end + + def perform_later(*args) + @job_class.new(*args).enqueue @options + end + end +end diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb new file mode 100644 index 0000000000..b6dd03a0bc --- /dev/null +++ b/activejob/lib/active_job/core.rb @@ -0,0 +1,89 @@ +module ActiveJob + module Core + extend ActiveSupport::Concern + + included do + # Job arguments + attr_accessor :arguments + attr_writer :serialized_arguments + + # Timestamp when the job should be performed + attr_accessor :scheduled_at + + # Job Identifier + attr_accessor :job_id + + # Queue on which the job should be run on. + attr_writer :queue_name + end + + module ClassMethods + # Creates a new job instance from a hash created with +serialize+ + def deserialize(job_data) + job = job_data['job_class'].constantize.new + job.job_id = job_data['job_id'] + job.queue_name = job_data['queue_name'] + job.serialized_arguments = job_data['arguments'] + job + end + + # Creates a job preconfigured with the given options. You can call + # perform_later with the job arguments to enqueue the job with the + # preconfigured options + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # VideoJob.set(queue: :some_queue).perform_later(Video.last) + # VideoJob.set(wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(wait_until: Time.tomorroe).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait_until: Time.tomorroe).perform_later(Video.last) + def set(options={}) + ConfiguredJob.new(self, options) + end + end + + # Creates a new job instance. Takes as arguments the arguments that + # will be passed to the perform method. + def initialize(*arguments) + @arguments = arguments + @job_id = SecureRandom.uuid + @queue_name = self.class.queue_name + end + + # Returns a hash with the job data that can safely be passed to the + # queueing adapter. + def serialize + { + 'job_class' => self.class.name, + 'job_id' => job_id, + 'queue_name' => queue_name, + 'arguments' => serialize_arguments(arguments) + } + end + + private + def deserialize_arguments_if_needed + if defined?(@serialized_arguments) && @serialized_arguments.present? + @arguments = deserialize_arguments(@serialized_arguments) + @serialized_arguments = nil + end + end + + def serialize_arguments(serialized_args) + Arguments.serialize(serialized_args) + end + + def deserialize_arguments(serialized_args) + Arguments.deserialize(serialized_args) + end + end +end + + + diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 3d00d51867..e8bc44cbc4 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -12,60 +12,64 @@ module ActiveJob # # Returns an instance of the job class queued with args available in # Job#arguments. - def enqueue(*args) - new(args).tap do |job| - job.run_callbacks :enqueue do - queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args) - end - end - end - - # Enqueue a job to be performed at +interval+ from now. - # - # enqueue_in(1.week, "mike") - # - # Returns an instance of the job class queued with args available in - # Job#arguments and the timestamp in Job#enqueue_at. - def enqueue_in(interval, *args) - enqueue_at interval.seconds.from_now, *args + def perform_later(*args) + job_or_instantiate(*args).enqueue end - # Enqueue a job to be performed at an explicit point in time. - # - # enqueue_at(Date.tomorrow.midnight, "mike") - # - # Returns an instance of the job class queued with args available in - # Job#arguments and the timestamp in Job#enqueue_at. - def enqueue_at(timestamp, *args) - new(args).tap do |job| - job.enqueued_at = timestamp - - job.run_callbacks :enqueue do - queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args) - end + protected + def job_or_instantiate(*args) + args.first.is_a?(self) ? args.first : new(*args) end - end - end - - included do - attr_accessor :arguments - attr_accessor :enqueued_at - end - - def initialize(arguments = nil) - @arguments = arguments end - def retry_now - self.class.enqueue(*arguments) + # Reschedule the job to be re-executed. This is usefull in combination + # with the +rescue_from+ option. When you rescue an exception from your job + # you can ask Active Job to retry performing your job. + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # class SiteScrapperJob < ActiveJob::Base + # rescue_from(ErrorLoadingSite) do + # retry_job queue: :low_priority + # end + # def perform(*args) + # # raise ErrorLoadingSite if cannot scrape + # end + # end + def retry_job(options={}) + enqueue options end - def retry_in(interval) - self.class.enqueue_in interval, *arguments - end - - def retry_at(timestamp) - self.class.enqueue_at timestamp, *arguments + # Equeue the job to be performed by the queue adapter. + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # my_job_instance.enqueue + # my_job_instance.enqueue wait: 5.minutes + # my_job_instance.enqueue queue: :important + # my_job_instance.enqueue wait_until: Date.tomorrow.midnight + def enqueue(options={}) + self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait] + self.scheduled_at = options[:wait_until].to_f if options[:wait_until] + self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] + run_callbacks :enqueue do + if self.scheduled_at + self.class.queue_adapter.enqueue_at self, self.scheduled_at + else + self.class.queue_adapter.enqueue self + end + end + self end end end diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb index 0e7b5bdd72..d6d67c46e3 100644 --- a/activejob/lib/active_job/execution.rb +++ b/activejob/lib/active_job/execution.rb @@ -4,15 +4,29 @@ require 'active_job/arguments' module ActiveJob module Execution extend ActiveSupport::Concern + include ActiveSupport::Rescuable - included do - include ActiveSupport::Rescuable - end + module ClassMethods + # Performs the job immediately. + # + # MyJob.perform_now("mike") + # + def perform_now(*args) + job_or_instantiate(*args).perform_now + end - def execute(job_id, *serialized_args) - self.job_id = job_id - self.arguments = deserialize_arguments(serialized_args) + def execute(job_data) #:nodoc: + job = deserialize(job_data) + job.perform_now + end + end + # Performs the job immediately. The job is not sent to the queueing adapter + # and will block the execution until it's finished. + # + # MyJob.new(*args).perform_now + def perform_now + deserialize_arguments_if_needed run_callbacks :perform do perform(*arguments) end @@ -23,11 +37,5 @@ module ActiveJob def perform(*) fail NotImplementedError end - - private - def deserialize_arguments(serialized_args) - Arguments.deserialize(serialized_args) - end - end end diff --git a/activejob/lib/active_job/identifier.rb b/activejob/lib/active_job/identifier.rb deleted file mode 100644 index c7f522087d..0000000000 --- a/activejob/lib/active_job/identifier.rb +++ /dev/null @@ -1,15 +0,0 @@ -require 'active_job/arguments' - -module ActiveJob - module Identifier - extend ActiveSupport::Concern - - included do - attr_writer :job_id - end - - def job_id - @job_id ||= SecureRandom.uuid - end - end -end diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb index ae098a80f3..d2030773bb 100644 --- a/activejob/lib/active_job/logging.rb +++ b/activejob/lib/active_job/logging.rb @@ -17,7 +17,7 @@ module ActiveJob around_perform do |job, block, _| tag_logger(job.class.name, job.job_id) do - payload = {adapter: job.class.queue_adapter, job: job.class, args: job.arguments} + payload = {adapter: job.class.queue_adapter, job: job} ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup) ActiveSupport::Notifications.instrument("perform.active_job", payload) do block.call @@ -26,12 +26,12 @@ module ActiveJob end before_enqueue do |job| - if job.enqueued_at + if job.scheduled_at ActiveSupport::Notifications.instrument "enqueue_at.active_job", - adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at + adapter: job.class.queue_adapter, job: job else ActiveSupport::Notifications.instrument "enqueue.active_job", - adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments + adapter: job.class.queue_adapter, job: job end end end @@ -52,19 +52,23 @@ module ActiveJob class LogSubscriber < ActiveSupport::LogSubscriber def enqueue(event) - info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event) } + job = event.payload[:job] + info { "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job) } end def enqueue_at(event) - info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) } + job = event.payload[:job] + info { "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job) } end def perform_start(event) - info { "Performing #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) } + job = event.payload[:job] + info { "Performing #{job.class.name} from #{queue_name(event)}" + args_info(job) } end def perform(event) - info { "Performed #{event.payload[:job].name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" } + job = event.payload[:job] + info { "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" } end private @@ -72,12 +76,12 @@ module ActiveJob event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})" end - def args_info(event) - event.payload[:args].any? ? " with arguments: #{event.payload[:args].map(&:inspect).join(", ")}" : "" + def args_info(job) + job.arguments.any? ? " with arguments: #{job.arguments.map(&:inspect).join(", ")}" : "" end - def enqueued_at(event) - Time.at(event.payload[:timestamp]).utc + def scheduled_at(event) + Time.at(event.payload[:job].scheduled_at).utc end def logger diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb index 8ebee36e45..e1b00f1de7 100644 --- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb @@ -4,20 +4,20 @@ module ActiveJob module QueueAdapters class BackburnerAdapter class << self - def enqueue(job, *args) - Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name + def enqueue(job) + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) delay = timestamp - Time.current.to_f - Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name, delay: delay + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay end end class JobWrapper class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb index a00569833a..658799edfc 100644 --- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class DelayedJobAdapter class << self - def enqueue(job, *args) - JobWrapper.new.delay(queue: job.queue_name).perform(job, *args) + def enqueue(job) + JobWrapper.new.delay(queue: job.queue_name).perform(job.serialize) end - def enqueue_at(job, timestamp, *args) - JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job, *args) + def enqueue_at(job, timestamp) + JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job.serialize) end end class JobWrapper - def perform(job, *args) - job.new.execute(*args) + def perform(job_data) + Base.execute(job_data) end end end diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb index 5805340fb0..fdefa38d5e 100644 --- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb @@ -2,8 +2,8 @@ module ActiveJob module QueueAdapters class InlineAdapter class << self - def enqueue(job, *args) - job.new.execute(*args) + def enqueue(job) + Base.execute(job.serialize) end def enqueue_at(*) diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb index 5cb741c094..f681fd7e8a 100644 --- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb @@ -5,7 +5,7 @@ module ActiveJob class QuAdapter class << self def enqueue(job, *args) - Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload| + Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload| payload.instance_variable_set(:@queue, job.queue_name) end.push end @@ -16,13 +16,12 @@ module ActiveJob end class JobWrapper < Qu::Job - def initialize(job_name, *args) - @job = job_name.constantize - @args = args + def initialize(job_data) + @job_data = job_data end def perform - @job.new.execute(*@args) + Base.execute @job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb index 9c84c74f83..51891ab07b 100644 --- a/activejob/lib/active_job/queue_adapters/que_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class QueAdapter class << self - def enqueue(job, *args) - JobWrapper.enqueue job.name, *args, queue: job.queue_name + def enqueue(job) + JobWrapper.enqueue job.serialize, queue: job.queue_name end - def enqueue_at(job, timestamp, *args) - JobWrapper.enqueue job.name, *args, queue: job.queue_name, run_at: Time.at(timestamp) + def enqueue_at(job, timestamp) + JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp) end end class JobWrapper < Que::Job - def run(job_name, *args) - job_name.constantize.new.execute(*args) + def run(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb index 914390a958..ddcc868317 100644 --- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class QueueClassicAdapter class << self - def enqueue(job, *args) - build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args) + def enqueue(job) + build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) queue = build_queue(job.queue_name) unless queue.respond_to?(:enqueue_at) raise NotImplementedError, 'To be able to schedule jobs with Queue Classic ' \ 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' 'You can implement this yourself or you can use the queue_classic-later gem.' end - queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.name, *args) + queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) end # Builds a <tt>QC::Queue</tt> object to schedule jobs on. @@ -30,8 +30,8 @@ module ActiveJob class JobWrapper class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb index da8212fc9b..affa3bdfbc 100644 --- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb @@ -16,23 +16,23 @@ module ActiveJob module QueueAdapters class ResqueAdapter class << self - def enqueue(job, *args) - Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args + def enqueue(job) + Resque.enqueue_to job.queue_name, JobWrapper, job.serialize end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) unless Resque.respond_to?(:enqueue_at_with_queue) raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \ "resque-scheduler gem. Please add it to your Gemfile and run bundle install" end - Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args + Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize end end class JobWrapper class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb index 3e20bec44c..79926a1e24 100644 --- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -4,20 +4,20 @@ module ActiveJob module QueueAdapters class SidekiqAdapter class << self - def enqueue(job, *args) + def enqueue(job) #Sidekiq::Client does not support symbols as keys Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, - 'args' => [ job, *args ], + 'args' => [ job.serialize ], 'retry' => true end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, - 'args' => [ job, *args ], + 'args' => [ job.serialize ], 'retry' => true, 'at' => timestamp end @@ -26,8 +26,8 @@ module ActiveJob class JobWrapper include Sidekiq::Worker - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb index 48b3df6a46..60633639f4 100644 --- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -7,14 +7,14 @@ module ActiveJob @monitor = Monitor.new class << self - def enqueue(job, *args) + def enqueue(job) @monitor.synchronize do JobWrapper.from_queue job.queue_name - JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ]) + JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize) end end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) raise NotImplementedError end end @@ -24,8 +24,8 @@ module ActiveJob from_queue 'active_jobs_default' def work(msg) - job_name, *args = ActiveSupport::JSON.decode(msg) - job_name.constantize.new.execute(*args) + job_data = ActiveSupport::JSON.decode(msg) + Base.execute job_data ack! end end diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb index 16f05744f3..b19a38093f 100644 --- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -4,11 +4,11 @@ module ActiveJob module QueueAdapters class SuckerPunchAdapter class << self - def enqueue(job, *args) - JobWrapper.new.async.perform job, *args + def enqueue(job) + JobWrapper.new.async.perform job.serialize end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) raise NotImplementedError end end @@ -16,8 +16,8 @@ module ActiveJob class JobWrapper include SuckerPunch::Job - def perform(job, *args) - job.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb index 185d6fc7e6..b9997efddf 100644 --- a/activejob/lib/active_job/queue_adapters/test_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb @@ -45,21 +45,21 @@ module ActiveJob @performed_jobs = val end - def enqueue(job, *args) + def enqueue(job) if perform_enqueued_jobs? - performed_jobs << {job: job, args: args, queue: job.queue_name} - job.new.execute(*args) + performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name} + job.perform_now else - enqueued_jobs << {job: job, args: args, queue: job.queue_name} + enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name} end end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) if perform_enqueued_at_jobs? - performed_jobs << {job: job, args: args, queue: job.queue_name, run_at: timestamp} - job.new.execute(*args) + performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp} + job.perform_now else - enqueued_jobs << {job: job, args: args, queue: job.queue_name, run_at: timestamp} + enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp} end end diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb index 9698835b6e..45acb71605 100644 --- a/activejob/lib/active_job/queue_name.rb +++ b/activejob/lib/active_job/queue_name.rb @@ -6,16 +6,33 @@ module ActiveJob mattr_accessor(:queue_name_prefix) mattr_accessor(:default_queue_name) { "default" } - def queue_as(part_name) + def queue_as(part_name=nil, &block) + if block_given? + self.queue_name = block + else + self.queue_name = queue_name_from_part(part_name) + end + end + + def queue_name_from_part(part_name) #:nodoc: queue_name = part_name.to_s.presence || default_queue_name name_parts = [queue_name_prefix.presence, queue_name] - self.queue_name = name_parts.compact.join('_') + name_parts.compact.join('_') end end included do - class_attribute :queue_name + class_attribute :queue_name, instance_accessor: false self.queue_name = default_queue_name end + + # Returns the name of the queue the job will be run on + def queue_name + if @queue_name.is_a?(Proc) + @queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name)) + end + @queue_name + end + end end |