diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/active_job/base.rb | 2 | ||||
-rw-r--r-- | lib/active_job/enqueuing.rb | 24 | ||||
-rw-r--r-- | lib/active_job/log_subscriber.rb | 19 | ||||
-rw-r--r-- | lib/active_job/logging.rb | 35 | ||||
-rw-r--r-- | lib/active_job/parameters.rb | 11 | ||||
-rw-r--r-- | lib/active_job/performing.rb | 14 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/backburner_adapter.rb | 8 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/delayed_job_adapter.rb | 10 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/inline_adapter.rb | 18 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/que_adapter.rb | 8 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/queue_classic_adapter.rb | 8 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/resque_adapter.rb | 9 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sidekiq_adapter.rb | 13 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sneakers_adapter.rb | 8 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sucker_punch_adapter.rb | 10 |
15 files changed, 151 insertions, 46 deletions
diff --git a/lib/active_job/base.rb b/lib/active_job/base.rb index d4f0f6777c..e6b02708a1 100644 --- a/lib/active_job/base.rb +++ b/lib/active_job/base.rb @@ -1,6 +1,7 @@ require 'active_job/queue_adapter' require 'active_job/queue_name' require 'active_job/enqueuing' +require 'active_job/performing' require 'active_job/logging' module ActiveJob @@ -8,6 +9,7 @@ module ActiveJob extend QueueAdapter extend QueueName extend Enqueuing + include Performing extend Logging ActiveSupport.run_load_hooks(:active_job, self) diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb index 6fb6f15ce2..652de5521b 100644 --- a/lib/active_job/enqueuing.rb +++ b/lib/active_job/enqueuing.rb @@ -2,7 +2,6 @@ require 'active_job/parameters' module ActiveJob module Enqueuing - ## # Push a job onto the queue. The arguments must be legal JSON types # (string, int, float, nil, true, false, hash or array) or # ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects @@ -13,7 +12,28 @@ module ActiveJob def enqueue(*args) serialized_args = Parameters.serialize(args) ActiveSupport::Notifications.instrument "enqueue.active_job", adapter: queue_adapter, job: self, args: serialized_args - queue_adapter.queue self, *serialized_args + queue_adapter.enqueue self, *serialized_args + end + + # Enqueue a job to be performed at +interval+ from now. + # + # enqueue_in(1.week, "mike") + # + # Returns truthy if a job was scheduled. + def enqueue_in(interval, *args) + enqueue_at(interval.seconds.from_now, *args) + end + + # Enqueue a job to be performed at an explicit point in time. + # + # enqueue_at(Date.tomorrow.midnight, "mike") + # + # Returns truthy if a job was scheduled. + def enqueue_at(timestamp, *args) + timestamp = timestamp.to_f + serialized_args = Parameters.serialize(args) + ActiveSupport::Notifications.instrument "enqueue_at.active_job", adapter: queue_adapter, job: self, args: serialized_args, timestamp: timestamp + queue_adapter.enqueue_at self, timestamp, *serialized_args end end end diff --git a/lib/active_job/log_subscriber.rb b/lib/active_job/log_subscriber.rb deleted file mode 100644 index 472c9f3081..0000000000 --- a/lib/active_job/log_subscriber.rb +++ /dev/null @@ -1,19 +0,0 @@ -require 'active_support/core_ext/string/filters' - -module ActiveJob - class LogSubscriber < ActiveSupport::LogSubscriber - def enqueue(event) - queue_name = event.payload[:adapter].name.demodulize.remove('Adapter') - job_name = event.payload[:job].name - args = event.payload[:args].any? ? ": #{event.payload[:args].inspect}" : "" - - info "Enqueued #{job_name} to #{queue_name}" + args - end - - def logger - ActiveJob::Base.logger - end - end -end - -ActiveJob::LogSubscriber.attach_to :active_job diff --git a/lib/active_job/logging.rb b/lib/active_job/logging.rb index 0e994a8f54..dc432679f8 100644 --- a/lib/active_job/logging.rb +++ b/lib/active_job/logging.rb @@ -1,7 +1,40 @@ -require 'active_job/log_subscriber' +require 'active_support/core_ext/string/filters' module ActiveJob module Logging mattr_accessor(:logger) { ActiveSupport::Logger.new(STDOUT) } + + class LogSubscriber < ActiveSupport::LogSubscriber + def enqueue(event) + info "Enqueued #{event.payload[:job].name} to #{queue_name(event)}" + args_info(event) + end + + def enqueue_at(event) + info "Enqueued #{event.payload[:job].name} to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) + end + + def perform(event) + info "Performed #{event.payload[:job].name} to #{queue_name(event)}" + args_info(event) + end + + private + def queue_name(event) + event.payload[:adapter].name.demodulize.remove('Adapter') + end + + def args_info(event) + event.payload[:args].any? ? ": #{event.payload[:args].inspect}" : "" + end + + def enqueued_at(event) + Time.at(event.payload[:timestamp]).utc + end + + def logger + ActiveJob::Base.logger + end + end end end + +ActiveJob::Logging::LogSubscriber.attach_to :active_job diff --git a/lib/active_job/parameters.rb b/lib/active_job/parameters.rb index 5f814f846d..955fd887d7 100644 --- a/lib/active_job/parameters.rb +++ b/lib/active_job/parameters.rb @@ -3,12 +3,17 @@ require 'active_support/core_ext/object/try' module ActiveJob class Parameters - TYPE_WHITELIST = [NilClass, Fixnum, Float, String, TrueClass, FalseClass, Hash, Array, Bignum] + TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Hash, Array, Bignum ] def self.serialize(params) params.collect do |param| - raise "Unsupported parameter type: #{param.class.name}" unless param.respond_to?(:global_id) || TYPE_WHITELIST.include?(param.class) - param.try(:global_id) || param + if param.respond_to?(:global_id) + param.global_id + elsif TYPE_WHITELIST.include?(param.class) + param + else + raise "Unsupported parameter type: #{param.class.name}" + end end end diff --git a/lib/active_job/performing.rb b/lib/active_job/performing.rb new file mode 100644 index 0000000000..6c304a4bed --- /dev/null +++ b/lib/active_job/performing.rb @@ -0,0 +1,14 @@ +require 'active_job/parameters' + +module ActiveJob + module Performing + def perform_with_deserialization(*serialized_args) + ActiveSupport::Notifications.instrument "perform.active_job", adapter: self.class.queue_adapter, job: self.class, args: serialized_args + perform *Parameters.deserialize(serialized_args) + end + + def perform(*) + raise NotImplementedError + end + end +end diff --git a/lib/active_job/queue_adapters/backburner_adapter.rb b/lib/active_job/queue_adapters/backburner_adapter.rb index 0ac745c7f2..5230acc625 100644 --- a/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/lib/active_job/queue_adapters/backburner_adapter.rb @@ -4,15 +4,19 @@ module ActiveJob module QueueAdapters class BackburnerAdapter class << self - def queue(job, *args) + def enqueue(job, *args) Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper class << self def perform(job_name, *args) - job_name.constantize.new.perform *Parameters.deserialize(args) + job_name.constantize.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb index 14072e2801..5a9c4c708d 100644 --- a/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -4,14 +4,18 @@ module ActiveJob module QueueAdapters class DelayedJobAdapter class << self - def queue(job, *args) + def enqueue(job, *args) JobWrapper.new.delay(queue: job.queue_name).perform(job, *args) end + + def enqueue_at(job, timestamp, *args) + JobWrapper.new.delay(queue: job.queue_name, run_at: timestamp).perform(job, *args) + end end - + class JobWrapper def perform(job, *args) - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/inline_adapter.rb b/lib/active_job/queue_adapters/inline_adapter.rb index cffa55af82..d826ce51b4 100644 --- a/lib/active_job/queue_adapters/inline_adapter.rb +++ b/lib/active_job/queue_adapters/inline_adapter.rb @@ -2,10 +2,22 @@ module ActiveJob module QueueAdapters class InlineAdapter class << self - def queue(job, *args) - job.new.perform *Parameters.deserialize(args) + def enqueue(job, *args) + job.new.perform_with_deserialization *args + end + + def enqueue_at(job, timestamp, *args) + Thread.new do + begin + interval = Time.now.to_f - timestamp + sleep(interval) if interval > 0 + job.new.perform_with_deserialization *args + rescue => e + ActiveJob::Base.logger.info "Error performing #{job}: #{e.message}" + end + end end end end end -end
\ No newline at end of file +end diff --git a/lib/active_job/queue_adapters/que_adapter.rb b/lib/active_job/queue_adapters/que_adapter.rb index 6750882b91..9dd57d65f3 100644 --- a/lib/active_job/queue_adapters/que_adapter.rb +++ b/lib/active_job/queue_adapters/que_adapter.rb @@ -4,14 +4,18 @@ module ActiveJob module QueueAdapters class QueAdapter class << self - def queue(job, *args) + def enqueue(job, *args) JobWrapper.enqueue job, *args, queue: job.queue_name end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper < Que::Job def run(job, *args) - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/queue_classic_adapter.rb b/lib/active_job/queue_adapters/queue_classic_adapter.rb index b82edae977..eacc6b5548 100644 --- a/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -4,14 +4,18 @@ module ActiveJob module QueueAdapters class QueueClassicAdapter class << self - def queue(job, *args) + def enqueue(job, *args) QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job, *args) end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper def self.perform(job, *args) - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/resque_adapter.rb b/lib/active_job/queue_adapters/resque_adapter.rb index 6686f10593..3b87f25b80 100644 --- a/lib/active_job/queue_adapters/resque_adapter.rb +++ b/lib/active_job/queue_adapters/resque_adapter.rb @@ -1,20 +1,25 @@ require 'resque' require 'active_support/core_ext/enumerable' require 'active_support/core_ext/array/access' +require 'resque_scheduler' module ActiveJob module QueueAdapters class ResqueAdapter class << self - def queue(job, *args) + def enqueue(job, *args) Resque.enqueue JobWrapper.new(job), job, *args end + + def enqueue_at(job, timestamp, *args) + Resque.enqueue_at timestamp, JobWrapper.new(job), job, *args + end end class JobWrapper class << self def perform(job_name, *args) - job_name.constantize.new.perform *Parameters.deserialize(args) + job_name.constantize.new.perform_with_deserialization *args end end diff --git a/lib/active_job/queue_adapters/sidekiq_adapter.rb b/lib/active_job/queue_adapters/sidekiq_adapter.rb index 43bd69790c..74fbe632d6 100644 --- a/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -4,20 +4,29 @@ module ActiveJob module QueueAdapters class SidekiqAdapter class << self - def queue(job, *args) + def enqueue(job, *args) Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, 'args' => [ job, *args ], 'retry' => true end + + def enqueue_at(job, timestamp, *args) + Sidekiq::Client.push \ + 'class' => JobWrapper, + 'queue' => job.queue_name, + 'args' => [ job, *args ], + 'retry' => true, + 'at' => timestamp + end end class JobWrapper include Sidekiq::Worker def perform(job_name, *args) - job_name.constantize.new.perform *Parameters.deserialize(args) + job_name.constantize.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb index 7be6b2a085..6bb575e907 100644 --- a/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -7,19 +7,23 @@ module ActiveJob @mutex = Mutex.new class << self - def queue(job, *args) + def enqueue(job, *args) @mutex.synchronize do JobWrapper.from_queue job.queue_name JobWrapper.enqueue [ job, *args ] end end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end end class JobWrapper include Sneakers::Worker def work(job, *args) - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args end end end diff --git a/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/lib/active_job/queue_adapters/sucker_punch_adapter.rb index 6ca6726456..30718fc05f 100644 --- a/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -4,8 +4,12 @@ module ActiveJob module QueueAdapters class SuckerPunchAdapter class << self - def queue(job, *args) - JobWrapper.new.async.perform(job, *args) + def enqueue(job, *args) + JobWrapper.new.async.perform job, *args + end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError end end @@ -13,7 +17,7 @@ module ActiveJob include SuckerPunch::Job def perform(job, *args) - job.new.perform *Parameters.deserialize(args) + job.new.perform_with_deserialization *args end end end |