diff options
Diffstat (limited to 'activejob/lib/active_job/queue_adapters')
11 files changed, 121 insertions, 116 deletions
diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb index 2453d065de..17703e3e41 100644 --- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb @@ -13,15 +13,13 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :backburner class BackburnerAdapter - class << self - def enqueue(job) #:nodoc: - Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name - end + def enqueue(job) #:nodoc: + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name + end - def enqueue_at(job, timestamp) #:nodoc: - delay = timestamp - Time.current.to_f - Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay - end + def enqueue_at(job, timestamp) #:nodoc: + delay = timestamp - Time.current.to_f + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb index 69d9e70de3..ac83da2b9c 100644 --- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -13,14 +13,16 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :delayed_job class DelayedJobAdapter - class << self - def enqueue(job) #:nodoc: - Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name) - end + def enqueue(job) #:nodoc: + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name) + job.provider_job_id = delayed_job.id + delayed_job + end - def enqueue_at(job, timestamp) #:nodoc: - Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp)) - end + def enqueue_at(job, timestamp) #:nodoc: + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp)) + job.provider_job_id = delayed_job.id + delayed_job end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb index e25d88e723..8ad5f4de07 100644 --- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb @@ -9,14 +9,12 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :inline class InlineAdapter - class << self - def enqueue(job) #:nodoc: - Base.execute(job.serialize) - end + def enqueue(job) #:nodoc: + Base.execute(job.serialize) + end - def enqueue_at(*) #:nodoc: - raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html") - end + def enqueue_at(*) #:nodoc: + raise NotImplementedError, "Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html" end end end diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb index 30aa5a4670..0e198922fc 100644 --- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb @@ -16,16 +16,18 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :qu class QuAdapter - class << self - def enqueue(job, *args) #:nodoc: - Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload| - payload.instance_variable_set(:@queue, job.queue_name) - end.push - end + def enqueue(job, *args) #:nodoc: + qu_job = Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload| + payload.instance_variable_set(:@queue, job.queue_name) + end.push + + # qu_job can be nil depending on the configured backend + job.provider_job_id = qu_job.id unless qu_job.nil? + qu_job + end - def enqueue_at(job, timestamp, *args) #:nodoc: - raise NotImplementedError - end + def enqueue_at(job, timestamp, *args) #:nodoc: + raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html" end class JobWrapper < Qu::Job #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb index e501fe0368..90947aa98d 100644 --- a/activejob/lib/active_job/queue_adapters/que_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -15,14 +15,16 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :que class QueAdapter - class << self - def enqueue(job) #:nodoc: - JobWrapper.enqueue job.serialize, queue: job.queue_name - end + def enqueue(job) #:nodoc: + que_job = JobWrapper.enqueue job.serialize + job.provider_job_id = que_job.attrs["job_id"] + que_job + end - def enqueue_at(job, timestamp) #:nodoc: - JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp) - end + def enqueue_at(job, timestamp) #:nodoc: + que_job = JobWrapper.enqueue job.serialize, run_at: Time.at(timestamp) + job.provider_job_id = que_job.attrs["job_id"] + que_job end class JobWrapper < Que::Job #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb index 34c11a68b2..059754a87f 100644 --- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -17,29 +17,27 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :queue_classic class QueueClassicAdapter - class << self - def enqueue(job) #:nodoc: - build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) - end + def enqueue(job) #:nodoc: + build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) + end - def enqueue_at(job, timestamp) #:nodoc: - queue = build_queue(job.queue_name) - unless queue.respond_to?(:enqueue_at) - raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \ - 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \ - 'You can implement this yourself or you can use the queue_classic-later gem.' - end - queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) + def enqueue_at(job, timestamp) #:nodoc: + queue = build_queue(job.queue_name) + unless queue.respond_to?(:enqueue_at) + raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \ + 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \ + 'You can implement this yourself or you can use the queue_classic-later gem.' end + queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) + end - # Builds a <tt>QC::Queue</tt> object to schedule jobs on. - # - # If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass - # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the - # <tt>build_queue</tt> method. - def build_queue(queue_name) - QC::Queue.new(queue_name) - end + # Builds a <tt>QC::Queue</tt> object to schedule jobs on. + # + # If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass + # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the + # <tt>build_queue</tt> method. + def build_queue(queue_name) + QC::Queue.new(queue_name) end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb index 88c6b48fef..417854afd8 100644 --- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb @@ -26,18 +26,16 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :resque class ResqueAdapter - class << self - def enqueue(job) #:nodoc: - Resque.enqueue_to job.queue_name, JobWrapper, job.serialize - end + def enqueue(job) #:nodoc: + Resque.enqueue_to job.queue_name, JobWrapper, job.serialize + end - def enqueue_at(job, timestamp) #:nodoc: - unless Resque.respond_to?(:enqueue_at_with_queue) - raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \ - "resque-scheduler gem. Please add it to your Gemfile and run bundle install" - end - Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize + def enqueue_at(job, timestamp) #:nodoc: + unless Resque.respond_to?(:enqueue_at_with_queue) + raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \ + "resque-scheduler gem. Please add it to your Gemfile and run bundle install" end + Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb index 21005fc728..c321776bf5 100644 --- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -15,22 +15,22 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :sidekiq class SidekiqAdapter - class << self - def enqueue(job) #:nodoc: - #Sidekiq::Client does not support symbols as keys - Sidekiq::Client.push \ - 'class' => JobWrapper, - 'queue' => job.queue_name, - 'args' => [ job.serialize ] - end + def enqueue(job) #:nodoc: + #Sidekiq::Client does not support symbols as keys + job.provider_job_id = Sidekiq::Client.push \ + 'class' => JobWrapper, + 'wrapped' => job.class.to_s, + 'queue' => job.queue_name, + 'args' => [ job.serialize ] + end - def enqueue_at(job, timestamp) #:nodoc: - Sidekiq::Client.push \ - 'class' => JobWrapper, - 'queue' => job.queue_name, - 'args' => [ job.serialize ], - 'at' => timestamp - end + def enqueue_at(job, timestamp) #:nodoc: + job.provider_job_id = Sidekiq::Client.push \ + 'class' => JobWrapper, + 'wrapped' => job.class.to_s, + 'queue' => job.queue_name, + 'args' => [ job.serialize ], + 'at' => timestamp end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb index 6d60a2f303..f102c6567e 100644 --- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -16,19 +16,19 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :sneakers class SneakersAdapter - @monitor = Monitor.new + def initialize + @monitor = Monitor.new + end - class << self - def enqueue(job) #:nodoc: - @monitor.synchronize do - JobWrapper.from_queue job.queue_name - JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize) - end + def enqueue(job) #:nodoc: + @monitor.synchronize do + JobWrapper.from_queue job.queue_name + JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize) end + end - def enqueue_at(job, timestamp) #:nodoc: - raise NotImplementedError - end + def enqueue_at(job, timestamp) #:nodoc: + raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html" end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb index be9e7fd03a..c6c35f8ab4 100644 --- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -18,14 +18,12 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :sucker_punch class SuckerPunchAdapter - class << self - def enqueue(job) #:nodoc: - JobWrapper.new.async.perform job.serialize - end + def enqueue(job) #:nodoc: + JobWrapper.new.async.perform job.serialize + end - def enqueue_at(job, timestamp) #:nodoc: - raise NotImplementedError - end + def enqueue_at(job, timestamp) #:nodoc: + raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html" end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb index ea9df9a063..9b7b7139f4 100644 --- a/activejob/lib/active_job/queue_adapters/test_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb @@ -10,15 +10,9 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :test class TestAdapter - delegate :name, to: :class - attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs) + attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter) attr_writer(:enqueued_jobs, :performed_jobs) - def initialize - self.perform_enqueued_jobs = false - self.perform_enqueued_at_jobs = false - end - # Provides a store of all the enqueued jobs with the TestAdapter so you can check them. def enqueued_jobs @enqueued_jobs ||= [] @@ -30,22 +24,37 @@ module ActiveJob end def enqueue(job) #:nodoc: - if perform_enqueued_jobs - performed_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name} - Base.execute job.serialize - else - enqueued_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name} - end + return if filtered?(job) + + job_data = job_to_hash(job) + enqueue_or_perform(perform_enqueued_jobs, job, job_data) end def enqueue_at(job, timestamp) #:nodoc: - if perform_enqueued_at_jobs - performed_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp} + return if filtered?(job) + + job_data = job_to_hash(job, at: timestamp) + enqueue_or_perform(perform_enqueued_at_jobs, job, job_data) + end + + private + + def job_to_hash(job, extras = {}) + { job: job.class, args: job.serialize.fetch('arguments'), queue: job.queue_name }.merge!(extras) + end + + def enqueue_or_perform(perform, job, job_data) + if perform + performed_jobs << job_data Base.execute job.serialize else - enqueued_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp} + enqueued_jobs << job_data end end + + def filtered?(job) + filter && !Array(filter).include?(job.class) + end end end end |