From 2aa9024eede567ae4daa97df62e32f57265096bc Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 20 May 2014 08:44:00 -0700 Subject: Fix boogs, stub not implemented adapters --- lib/active_job/enqueuing.rb | 9 +++++---- lib/active_job/queue_adapters/inline_adapter.rb | 2 +- lib/active_job/queue_adapters/que_adapter.rb | 4 ++++ lib/active_job/queue_adapters/sneakers_adapter.rb | 4 ++++ 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb index bb681d1e32..43bce125b5 100644 --- a/lib/active_job/enqueuing.rb +++ b/lib/active_job/enqueuing.rb @@ -32,11 +32,12 @@ module ActiveJob # # Returns truthy if a job was scheduled. def enqueue_at(timestamp, *args) - ActiveSupport::Notifications.instrument "enqueue_at.active_job", adapter: queue_adapter, timestamp: timestamp, job: self, params: args - if Time.now.to_f > timestamp - queue.adapter.queue self, *Parameters.serialize(args) + ts = timestamp.to_f + ActiveSupport::Notifications.instrument "enqueue_at.active_job", adapter: queue_adapter, timestamp: ts, job: self, params: args + if Time.now.to_f > ts + queue_adapter.queue self, *Parameters.serialize(args) else - queue_adapter.queue_at self, timestamp.to_f, *Parameters.serialize(args) + queue_adapter.queue_at self, ts, *Parameters.serialize(args) end end end diff --git a/lib/active_job/queue_adapters/inline_adapter.rb b/lib/active_job/queue_adapters/inline_adapter.rb index cad600a765..4afd4f708e 100644 --- a/lib/active_job/queue_adapters/inline_adapter.rb +++ b/lib/active_job/queue_adapters/inline_adapter.rb @@ -14,7 +14,7 @@ module ActiveJob sleep(interval) if interval > 0 job.new.perform *Parameters.deserialize(args) rescue => ex - ActiveSupport::Notifications.instrument "error.perform.active_job", adapter: self, job: job, params: args, error: ex + ActiveSupport::Notifications.instrument "perform_error.active_job", adapter: self, job: job, params: args, error: ex end end end diff --git a/lib/active_job/queue_adapters/que_adapter.rb b/lib/active_job/queue_adapters/que_adapter.rb index 6750882b91..30c23a35b9 100644 --- a/lib/active_job/queue_adapters/que_adapter.rb +++ b/lib/active_job/queue_adapters/que_adapter.rb @@ -7,6 +7,10 @@ module ActiveJob def queue(job, *args) JobWrapper.enqueue job, *args, queue: job.queue_name end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper < Que::Job diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb index c6dbfa75bf..ae9e49a0bf 100644 --- a/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -7,6 +7,10 @@ module ActiveJob def queue(job, *args) JobWrapper.enqueue([job, *args]) end + + def queue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper -- cgit v1.2.3