diff options
author | David Heinemeier Hansson <david@basecamp.com> | 2014-05-21 15:28:00 +0200 |
---|---|---|
committer | David Heinemeier Hansson <david@basecamp.com> | 2014-05-21 15:28:00 +0200 |
commit | 69607a8e3e4bf92f0ec6d7ce8521bc011b51c7d8 (patch) | |
tree | cd9c381c318e13613ff5922ab8110c8a36ccaee5 | |
parent | 4ad28b18672bbb369bdc446d72fdd829eeaecd88 (diff) | |
parent | 484dbd8ed6703a9ba89617ad92f9ffb7d0812d59 (diff) | |
download | rails-69607a8e3e4bf92f0ec6d7ce8521bc011b51c7d8.tar.gz rails-69607a8e3e4bf92f0ec6d7ce8521bc011b51c7d8.tar.bz2 rails-69607a8e3e4bf92f0ec6d7ce8521bc011b51c7d8.zip |
Merge pull request #53 from DouweM/performing-module
Don't deserialize parameters in individual adapters.
-rw-r--r-- | lib/active_job/base.rb | 2 | ||||
-rw-r--r-- | lib/active_job/enqueuing.rb | 6 | ||||
-rw-r--r-- | lib/active_job/logging.rb | 8 | ||||
-rw-r--r-- | lib/active_job/performing.rb | 14 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/backburner_adapter.rb | 8 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/delayed_job_adapter.rb | 6 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/inline_adapter.rb | 8 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/que_adapter.rb | 6 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/queue_classic_adapter.rb | 6 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/resque_adapter.rb | 6 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sidekiq_adapter.rb | 6 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sneakers_adapter.rb | 6 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sucker_punch_adapter.rb | 6 |
13 files changed, 54 insertions, 34 deletions
diff --git a/lib/active_job/base.rb b/lib/active_job/base.rb index 3d16f38275..8bddfde09f 100644 --- a/lib/active_job/base.rb +++ b/lib/active_job/base.rb @@ -1,6 +1,7 @@ require 'active_job/queue_adapter' require 'active_job/queue_name' require 'active_job/enqueuing' +require 'active_job/performing' require 'active_job/logging' module ActiveJob @@ -8,6 +9,7 @@ module ActiveJob extend QueueAdapter extend QueueName extend Enqueuing + include Performing extend Logging end end diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb index 7f2aef871e..dafa2399c8 100644 --- a/lib/active_job/enqueuing.rb +++ b/lib/active_job/enqueuing.rb @@ -12,7 +12,7 @@ module ActiveJob def enqueue(*args) serialized_args = Parameters.serialize(args) ActiveSupport::Notifications.instrument "enqueue.active_job", adapter: queue_adapter, job: self, args: serialized_args - queue_adapter.queue self, *serialized_args + queue_adapter.enqueue self, *serialized_args end # Enqueue a job to be performed at +interval+ from now. @@ -32,8 +32,8 @@ module ActiveJob def enqueue_at(timestamp, *args) timestamp = timestamp.to_f serialized_args = Parameters.serialize(args) - ActiveSupport::Notifications.instrument "enqueue_at.active_job", adapter: queue_adapter, timestamp: timestamp, job: self, args: serialized_args - queue_adapter.queue_at self, timestamp, *serialized_args + ActiveSupport::Notifications.instrument "enqueue_at.active_job", adapter: queue_adapter, job: self, args: serialized_args, timestamp: timestamp + queue_adapter.enqueue_at self, timestamp, *serialized_args end end end diff --git a/lib/active_job/logging.rb b/lib/active_job/logging.rb index f06f12087b..22748e3d5f 100644 --- a/lib/active_job/logging.rb +++ b/lib/active_job/logging.rb @@ -10,7 +10,11 @@ module ActiveJob end def enqueue_at(event) - info "Enqueued #{event.payload[:job].name} to #{queue_name(event)} at #{enqueud_at(event)}" + args_info(event) + info "Enqueued #{event.payload[:job].name} to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) + end + + def perform(event) + info "Performed #{event.payload[:job].name} to #{queue_name(event)}" + args_info(event) end private @@ -22,7 +26,7 @@ module ActiveJob event.payload[:args].any? ? ": #{event.payload[:args].inspect}" : "" end - def enqueud_at(event) + def enqueued_at(event) Time.at(event.payload[:timestamp]).utc end diff --git a/lib/active_job/performing.rb b/lib/active_job/performing.rb new file mode 100644 index 0000000000..6c304a4bed --- /dev/null +++ b/lib/active_job/performing.rb @@ -0,0 +1,14 @@ +require 'active_job/parameters' + +module ActiveJob + module Performing + def perform_with_deserialization(*serialized_args) + ActiveSupport::Notifications.instrument "perform.active_job", adapter: self.class.queue_adapter, job: self.class, args: serialized_args + perform *Parameters.deserialize(serialized_args) + end + + def perform(*) + raise NotImplementedError + end + end +end diff --git a/lib/active_job/queue_adapters/backburner_adapter.rb b/lib/active_job/queue_adapters/backburner_adapter.rb index 0d023d9ee7..5230acc625 100644 --- a/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/lib/active_job/queue_adapters/backburner_adapter.rb @@ -4,19 +4,19 @@ module ActiveJob module QueueAdapters class BackburnerAdapter class << self - def queue(job, *args) + def enqueue(job, *args) Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name end - def queue_at(job, timestamp, *args) + def enqueue_at(job, timestamp, *args) raise NotImplementedError end - end + end class JobWrapper class << self def perform(job_name, *args) - job_name.constantize.new.perform *Parameters.deserialize(args) + job_name.constantize.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb index 214733e3a6..5a9c4c708d 100644 --- a/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class DelayedJobAdapter class << self - def queue(job, *args) + def enqueue(job, *args) JobWrapper.new.delay(queue: job.queue_name).perform(job, *args) end - def queue_at(job, timestamp, *args) + def enqueue_at(job, timestamp, *args) JobWrapper.new.delay(queue: job.queue_name, run_at: timestamp).perform(job, *args) end end class JobWrapper def perform(job, *args) - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/inline_adapter.rb b/lib/active_job/queue_adapters/inline_adapter.rb index 4f703a6c3f..d826ce51b4 100644 --- a/lib/active_job/queue_adapters/inline_adapter.rb +++ b/lib/active_job/queue_adapters/inline_adapter.rb @@ -2,16 +2,16 @@ module ActiveJob module QueueAdapters class InlineAdapter class << self - def queue(job, *args) - job.new.perform *Parameters.deserialize(args) + def enqueue(job, *args) + job.new.perform_with_deserialization *args end - def queue_at(job, timestamp, *args) + def enqueue_at(job, timestamp, *args) Thread.new do begin interval = Time.now.to_f - timestamp sleep(interval) if interval > 0 - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args rescue => e ActiveJob::Base.logger.info "Error performing #{job}: #{e.message}" end diff --git a/lib/active_job/queue_adapters/que_adapter.rb b/lib/active_job/queue_adapters/que_adapter.rb index 30c23a35b9..9dd57d65f3 100644 --- a/lib/active_job/queue_adapters/que_adapter.rb +++ b/lib/active_job/queue_adapters/que_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class QueAdapter class << self - def queue(job, *args) + def enqueue(job, *args) JobWrapper.enqueue job, *args, queue: job.queue_name end - def queue_at(job, timestamp, *args) + def enqueue_at(job, timestamp, *args) raise NotImplementedError end end class JobWrapper < Que::Job def run(job, *args) - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/queue_classic_adapter.rb b/lib/active_job/queue_adapters/queue_classic_adapter.rb index d0e2e1aa22..eacc6b5548 100644 --- a/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -4,18 +4,18 @@ module ActiveJob module QueueAdapters class QueueClassicAdapter class << self - def queue(job, *args) + def enqueue(job, *args) QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job, *args) end - def queue_at(job, timestamp, *args) + def enqueue_at(job, timestamp, *args) raise NotImplementedError end end class JobWrapper def self.perform(job, *args) - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/resque_adapter.rb b/lib/active_job/queue_adapters/resque_adapter.rb index 353b3ae690..3b87f25b80 100644 --- a/lib/active_job/queue_adapters/resque_adapter.rb +++ b/lib/active_job/queue_adapters/resque_adapter.rb @@ -7,11 +7,11 @@ module ActiveJob module QueueAdapters class ResqueAdapter class << self - def queue(job, *args) + def enqueue(job, *args) Resque.enqueue JobWrapper.new(job), job, *args end - def queue_at(job, timestamp, *args) + def enqueue_at(job, timestamp, *args) Resque.enqueue_at timestamp, JobWrapper.new(job), job, *args end end @@ -19,7 +19,7 @@ module ActiveJob class JobWrapper class << self def perform(job_name, *args) - job_name.constantize.new.perform *Parameters.deserialize(args) + job_name.constantize.new.perform_with_deserialization *args end end diff --git a/lib/active_job/queue_adapters/sidekiq_adapter.rb b/lib/active_job/queue_adapters/sidekiq_adapter.rb index 9829049220..74fbe632d6 100644 --- a/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -4,7 +4,7 @@ module ActiveJob module QueueAdapters class SidekiqAdapter class << self - def queue(job, *args) + def enqueue(job, *args) Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, @@ -12,7 +12,7 @@ module ActiveJob 'retry' => true end - def queue_at(job, timestamp, *args) + def enqueue_at(job, timestamp, *args) Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, @@ -26,7 +26,7 @@ module ActiveJob include Sidekiq::Worker def perform(job_name, *args) - job_name.constantize.new.perform *Parameters.deserialize(args) + job_name.constantize.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb index b299b25a96..6bb575e907 100644 --- a/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -7,14 +7,14 @@ module ActiveJob @mutex = Mutex.new class << self - def queue(job, *args) + def enqueue(job, *args) @mutex.synchronize do JobWrapper.from_queue job.queue_name JobWrapper.enqueue [ job, *args ] end end - def queue_at(job, timestamp, *args) + def enqueue_at(job, timestamp, *args) raise NotImplementedError end end @@ -23,7 +23,7 @@ module ActiveJob include Sneakers::Worker def work(job, *args) - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/lib/active_job/queue_adapters/sucker_punch_adapter.rb index 4163db1c35..30718fc05f 100644 --- a/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -4,11 +4,11 @@ module ActiveJob module QueueAdapters class SuckerPunchAdapter class << self - def queue(job, *args) + def enqueue(job, *args) JobWrapper.new.async.perform job, *args end - def queue_at(job, timestamp, *args) + def enqueue_at(job, timestamp, *args) raise NotImplementedError end end @@ -17,7 +17,7 @@ module ActiveJob include SuckerPunch::Job def perform(job, *args) - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args end end end |