diff options
Diffstat (limited to 'lib/active_job/queue_adapters')
9 files changed, 64 insertions, 3 deletions
diff --git a/lib/active_job/queue_adapters/backburner_adapter.rb b/lib/active_job/queue_adapters/backburner_adapter.rb index 0ac745c7f2..0d023d9ee7 100644 --- a/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/lib/active_job/queue_adapters/backburner_adapter.rb @@ -7,7 +7,11 @@ module ActiveJob def queue(job, *args) Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name end - end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end + end class JobWrapper class << self diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb index 14072e2801..214733e3a6 100644 --- a/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -7,8 +7,12 @@ module ActiveJob def queue(job, *args) JobWrapper.new.delay(queue: job.queue_name).perform(job, *args) end + + def queue_at(job, timestamp, *args) + JobWrapper.new.delay(queue: job.queue_name, run_at: timestamp).perform(job, *args) + end end - + class JobWrapper def perform(job, *args) job.new.perform *Parameters.deserialize(args) diff --git a/lib/active_job/queue_adapters/inline_adapter.rb b/lib/active_job/queue_adapters/inline_adapter.rb index cffa55af82..8a2c7d9d92 100644 --- a/lib/active_job/queue_adapters/inline_adapter.rb +++ b/lib/active_job/queue_adapters/inline_adapter.rb @@ -5,7 +5,20 @@ module ActiveJob def queue(job, *args) job.new.perform *Parameters.deserialize(args) end + + def queue_at(job, ts, *args) + # TODO better error handling? + Thread.new do + begin + interval = Time.now.to_f - ts + sleep(interval) if interval > 0 + job.new.perform *Parameters.deserialize(args) + rescue => ex + ActiveJob::Base.logger "Error performing #{job}: #{ex.message}" + end + end + end end end end -end
\ No newline at end of file +end diff --git a/lib/active_job/queue_adapters/que_adapter.rb b/lib/active_job/queue_adapters/que_adapter.rb index 6750882b91..30c23a35b9 100644 --- a/lib/active_job/queue_adapters/que_adapter.rb +++ b/lib/active_job/queue_adapters/que_adapter.rb @@ -7,6 +7,10 @@ module ActiveJob def queue(job, *args) JobWrapper.enqueue job, *args, queue: job.queue_name end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper < Que::Job diff --git a/lib/active_job/queue_adapters/queue_classic_adapter.rb b/lib/active_job/queue_adapters/queue_classic_adapter.rb index b82edae977..d0e2e1aa22 100644 --- a/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -7,6 +7,10 @@ module ActiveJob def queue(job, *args) QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job, *args) end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper diff --git a/lib/active_job/queue_adapters/resque_adapter.rb b/lib/active_job/queue_adapters/resque_adapter.rb index 6686f10593..8fa8dddd11 100644 --- a/lib/active_job/queue_adapters/resque_adapter.rb +++ b/lib/active_job/queue_adapters/resque_adapter.rb @@ -1,6 +1,7 @@ require 'resque' require 'active_support/core_ext/enumerable' require 'active_support/core_ext/array/access' +require 'resque_scheduler' module ActiveJob module QueueAdapters @@ -9,6 +10,11 @@ module ActiveJob def queue(job, *args) Resque.enqueue JobWrapper.new(job), job, *args end + + def queue_at(job, timestamp, *args) + # requires resque-scheduler + Resque.enqueue_at timestamp, JobWrapper.new(job), job, *args + end end class JobWrapper diff --git a/lib/active_job/queue_adapters/sidekiq_adapter.rb b/lib/active_job/queue_adapters/sidekiq_adapter.rb index 43bd69790c..be6bd4ee01 100644 --- a/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -11,6 +11,15 @@ module ActiveJob 'args' => [ job, *args ], 'retry' => true end + + def queue_at(job, timestamp, *args) + Sidekiq::Client.push \ + 'class' => JobWrapper, + 'queue' => job.queue_name, + 'args' => [ job, *args ], + 'at' => timestamp, + 'retry' => true + end end class JobWrapper diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb index 7be6b2a085..b299b25a96 100644 --- a/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -13,6 +13,10 @@ module ActiveJob JobWrapper.enqueue [ job, *args ] end end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper diff --git a/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/lib/active_job/queue_adapters/sucker_punch_adapter.rb index 6ca6726456..79043b06e0 100644 --- a/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -7,6 +7,15 @@ module ActiveJob def queue(job, *args) JobWrapper.new.async.perform(job, *args) end + + def queue_at(job, timestamp, *args) + delay = Time.now.to_f - timestamp + if delay > 0 + JobWrapper.new.async.later(delay, job, *args) + else + JobWrapper.new.async.perform(job, *args) + end + end end class JobWrapper @@ -15,6 +24,10 @@ module ActiveJob def perform(job, *args) job.new.perform *Parameters.deserialize(args) end + + def later(sec, job_name, *args) + after(sec) { perform(job_name, *args) } + end end end end |