diff options
-rw-r--r-- | Gemfile | 1 | ||||
-rw-r--r-- | Gemfile.lock | 17 | ||||
-rw-r--r-- | lib/active_job/enqueuing.rb | 20 | ||||
-rw-r--r-- | lib/active_job/log_subscriber.rb | 11 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/backburner_adapter.rb | 6 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/delayed_job_adapter.rb | 6 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/inline_adapter.rb | 14 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/que_adapter.rb | 4 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/queue_classic_adapter.rb | 4 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/resque_adapter.rb | 5 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sidekiq_adapter.rb | 9 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sneakers_adapter.rb | 4 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sucker_punch_adapter.rb | 4 | ||||
-rw-r--r-- | test/cases/queuing_test.rb | 9 |
14 files changed, 105 insertions, 9 deletions
@@ -4,6 +4,7 @@ gemspec gem 'rake' gem 'resque' +gem 'resque-scheduler' gem 'sidekiq' gem 'sucker_punch' gem 'delayed_job' diff --git a/Gemfile.lock b/Gemfile.lock index 74758c73fe..7dda62c1fe 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -38,24 +38,30 @@ GEM json (1.8.1) minitest (5.3.4) mono_logger (1.1.0) - multi_json (1.10.1) + multi_json (1.9.3) pg (0.17.1) que (0.7.3) queue_classic (2.2.3) pg (~> 0.17.0) rack (1.5.2) - rack-protection (1.5.3) + rack-protection (1.5.2) rack rake (10.3.2) redis (3.0.7) redis-namespace (1.4.1) redis (~> 3.0.4) - resque (1.25.2) + resque (1.24.1) mono_logger (~> 1.0) multi_json (~> 1.0) - redis-namespace (~> 1.3) + redis-namespace (~> 1.2) sinatra (>= 0.9.2) vegas (~> 0.1.2) + resque-scheduler (2.2.0) + redis (>= 3.0.0) + resque (>= 1.20.0, < 1.25) + rufus-scheduler (~> 2.0) + rufus-scheduler (2.0.24) + tzinfo (>= 0.3.22) serverengine (1.5.7) sigdump (~> 0.2.2) sidekiq (3.0.2) @@ -65,7 +71,7 @@ GEM redis (>= 3.0.6) redis-namespace (>= 1.3.1) sigdump (0.2.2) - sinatra (1.4.5) + sinatra (1.4.4) rack (~> 1.4) rack-protection (~> 1.4) tilt (~> 1.3, >= 1.3.4) @@ -97,6 +103,7 @@ DEPENDENCIES queue_classic rake resque + resque-scheduler sidekiq sneakers (= 0.1.1.pre) sucker_punch diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb index 61734fdfb6..80b10cdbcb 100644 --- a/lib/active_job/enqueuing.rb +++ b/lib/active_job/enqueuing.rb @@ -14,5 +14,25 @@ module ActiveJob ActiveSupport::Notifications.instrument "enqueue.active_job", adapter: queue_adapter, job: self, args: serialized_args queue_adapter.queue self, *serialized_args end + + # Enqueue a job to be performed at +interval+ from now. + # + # enqueue_in(1.week, "mike") + # + # Returns truthy if a job was scheduled. + def enqueue_in(interval, *args) + enqueue_at(interval.from_now, *args) + end + + # Enqueue a job to be performed at an explicit point in time. + # + # enqueue_at(Date.tomorrow.midnight, "mike") + # + # Returns truthy if a job was scheduled. + def enqueue_at(timestamp, *args) + timestamp = timestamp.to_f + ActiveSupport::Notifications.instrument "enqueue_at.active_job", adapter: queue_adapter, timestamp: timestamp, job: self, args: args + queue_adapter.queue_at self, timestamp, *Parameters.serialize(args) + end end end diff --git a/lib/active_job/log_subscriber.rb b/lib/active_job/log_subscriber.rb index 472c9f3081..eae6c30745 100644 --- a/lib/active_job/log_subscriber.rb +++ b/lib/active_job/log_subscriber.rb @@ -4,12 +4,21 @@ module ActiveJob class LogSubscriber < ActiveSupport::LogSubscriber def enqueue(event) queue_name = event.payload[:adapter].name.demodulize.remove('Adapter') - job_name = event.payload[:job].name + job_name = event.payload[:job].name args = event.payload[:args].any? ? ": #{event.payload[:args].inspect}" : "" info "Enqueued #{job_name} to #{queue_name}" + args end + def enqueue_at(event) + queue_name = event.payload[:adapter].name.demodulize.remove('Adapter') + job_name = event.payload[:job].name + args = event.payload[:args].any? ? ": #{event.payload[:args].inspect}" : "" + time = event.payload[:timestamp] + + info "Enqueued #{job_name} to #{queue_name} at #{time}" + args + end + def logger ActiveJob::Base.logger end diff --git a/lib/active_job/queue_adapters/backburner_adapter.rb b/lib/active_job/queue_adapters/backburner_adapter.rb index 0ac745c7f2..0d023d9ee7 100644 --- a/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/lib/active_job/queue_adapters/backburner_adapter.rb @@ -7,7 +7,11 @@ module ActiveJob def queue(job, *args) Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name end - end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end + end class JobWrapper class << self diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb index 14072e2801..214733e3a6 100644 --- a/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -7,8 +7,12 @@ module ActiveJob def queue(job, *args) JobWrapper.new.delay(queue: job.queue_name).perform(job, *args) end + + def queue_at(job, timestamp, *args) + JobWrapper.new.delay(queue: job.queue_name, run_at: timestamp).perform(job, *args) + end end - + class JobWrapper def perform(job, *args) job.new.perform *Parameters.deserialize(args) diff --git a/lib/active_job/queue_adapters/inline_adapter.rb b/lib/active_job/queue_adapters/inline_adapter.rb index cffa55af82..414a918d2b 100644 --- a/lib/active_job/queue_adapters/inline_adapter.rb +++ b/lib/active_job/queue_adapters/inline_adapter.rb @@ -5,7 +5,19 @@ module ActiveJob def queue(job, *args) job.new.perform *Parameters.deserialize(args) end + + def queue_at(job, ts, *args) + Thread.new do + begin + interval = Time.now.to_f - ts + sleep(interval) if interval > 0 + job.new.perform *Parameters.deserialize(args) + rescue => ex + ActiveJob::Base.logger.info "Error performing #{job}: #{ex.message}" + end + end + end end end end -end
\ No newline at end of file +end diff --git a/lib/active_job/queue_adapters/que_adapter.rb b/lib/active_job/queue_adapters/que_adapter.rb index 6750882b91..30c23a35b9 100644 --- a/lib/active_job/queue_adapters/que_adapter.rb +++ b/lib/active_job/queue_adapters/que_adapter.rb @@ -7,6 +7,10 @@ module ActiveJob def queue(job, *args) JobWrapper.enqueue job, *args, queue: job.queue_name end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper < Que::Job diff --git a/lib/active_job/queue_adapters/queue_classic_adapter.rb b/lib/active_job/queue_adapters/queue_classic_adapter.rb index b82edae977..d0e2e1aa22 100644 --- a/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -7,6 +7,10 @@ module ActiveJob def queue(job, *args) QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job, *args) end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper diff --git a/lib/active_job/queue_adapters/resque_adapter.rb b/lib/active_job/queue_adapters/resque_adapter.rb index 6686f10593..353b3ae690 100644 --- a/lib/active_job/queue_adapters/resque_adapter.rb +++ b/lib/active_job/queue_adapters/resque_adapter.rb @@ -1,6 +1,7 @@ require 'resque' require 'active_support/core_ext/enumerable' require 'active_support/core_ext/array/access' +require 'resque_scheduler' module ActiveJob module QueueAdapters @@ -9,6 +10,10 @@ module ActiveJob def queue(job, *args) Resque.enqueue JobWrapper.new(job), job, *args end + + def queue_at(job, timestamp, *args) + Resque.enqueue_at timestamp, JobWrapper.new(job), job, *args + end end class JobWrapper diff --git a/lib/active_job/queue_adapters/sidekiq_adapter.rb b/lib/active_job/queue_adapters/sidekiq_adapter.rb index 43bd69790c..be6bd4ee01 100644 --- a/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -11,6 +11,15 @@ module ActiveJob 'args' => [ job, *args ], 'retry' => true end + + def queue_at(job, timestamp, *args) + Sidekiq::Client.push \ + 'class' => JobWrapper, + 'queue' => job.queue_name, + 'args' => [ job, *args ], + 'at' => timestamp, + 'retry' => true + end end class JobWrapper diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb index 7be6b2a085..b299b25a96 100644 --- a/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -13,6 +13,10 @@ module ActiveJob JobWrapper.enqueue [ job, *args ] end end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper diff --git a/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/lib/active_job/queue_adapters/sucker_punch_adapter.rb index 6ca6726456..8a3d6d10a1 100644 --- a/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -7,6 +7,10 @@ module ActiveJob def queue(job, *args) JobWrapper.new.async.perform(job, *args) end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper diff --git a/test/cases/queuing_test.rb b/test/cases/queuing_test.rb index b6180a23dd..958b81ca3a 100644 --- a/test/cases/queuing_test.rb +++ b/test/cases/queuing_test.rb @@ -1,5 +1,6 @@ require 'helper' require 'jobs/hello_job' +require 'active_support/core_ext/numeric/time' class QueuingTest < ActiveSupport::TestCase @@ -16,4 +17,12 @@ class QueuingTest < ActiveSupport::TestCase HelloJob.enqueue "Jamie" assert_equal "Jamie says hello", $BUFFER.pop end + + test 'run queued job later' do + begin + result = HelloJob.enqueue_at 1.second.ago, "Jamie" + assert result + rescue NotImplementedError + end + end end |