diff options
Diffstat (limited to 'lib/active_job')
-rw-r--r-- | lib/active_job/base.rb | 4 | ||||
-rw-r--r-- | lib/active_job/enqueuing.rb | 1 | ||||
-rw-r--r-- | lib/active_job/log_subscriber.rb | 18 | ||||
-rw-r--r-- | lib/active_job/logging.rb | 7 | ||||
-rw-r--r-- | lib/active_job/queue_adapter.rb | 15 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/que_adapter.rb | 19 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/queue_classic_adapter.rb | 3 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sneakers_adapter.rb | 23 | ||||
-rw-r--r-- | lib/active_job/queue_adapters/sucker_punch_adapter.rb | 4 |
9 files changed, 81 insertions, 13 deletions
diff --git a/lib/active_job/base.rb b/lib/active_job/base.rb index 77b929d4af..3d16f38275 100644 --- a/lib/active_job/base.rb +++ b/lib/active_job/base.rb @@ -1,11 +1,13 @@ require 'active_job/queue_adapter' require 'active_job/queue_name' require 'active_job/enqueuing' +require 'active_job/logging' module ActiveJob class Base extend QueueAdapter extend QueueName extend Enqueuing + extend Logging end -end
\ No newline at end of file +end diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb index a5a50d69db..b2d142ee96 100644 --- a/lib/active_job/enqueuing.rb +++ b/lib/active_job/enqueuing.rb @@ -11,6 +11,7 @@ module ActiveJob # The return value is adapter-specific and may change in a future # ActiveJob release. def enqueue(*args) + ActiveSupport::Notifications.instrument "enqueue.active_job", adapter: queue_adapter, job: self, params: args queue_adapter.queue self, *Parameters.serialize(args) end diff --git a/lib/active_job/log_subscriber.rb b/lib/active_job/log_subscriber.rb new file mode 100644 index 0000000000..31c61a6068 --- /dev/null +++ b/lib/active_job/log_subscriber.rb @@ -0,0 +1,18 @@ +module ActiveJob + class LogSubscriber < ActiveSupport::LogSubscriber + def enqueue(event) + payload = event.payload + params = payload[:params] + adapter = payload[:adapter] + job = payload[:job] + + info "ActiveJob enqueued to #{adapter.name.demodulize} job #{job.name}: #{params.inspect}" + end + + def logger + ActiveJob::Base.logger + end + end +end + +ActiveJob::LogSubscriber.attach_to :active_job diff --git a/lib/active_job/logging.rb b/lib/active_job/logging.rb new file mode 100644 index 0000000000..0e994a8f54 --- /dev/null +++ b/lib/active_job/logging.rb @@ -0,0 +1,7 @@ +require 'active_job/log_subscriber' + +module ActiveJob + module Logging + mattr_accessor(:logger) { ActiveSupport::Logger.new(STDOUT) } + end +end diff --git a/lib/active_job/queue_adapter.rb b/lib/active_job/queue_adapter.rb index 2033f6fe56..8f2f8b86ea 100644 --- a/lib/active_job/queue_adapter.rb +++ b/lib/active_job/queue_adapter.rb @@ -6,14 +6,13 @@ module ActiveJob mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter } def queue_adapter=(name_or_adapter) - case name_or_adapter - when Symbol, String - adapter = load_adapter(name_or_adapter) - else - adapter = name_or_adapter - end - - @@queue_adapter = adapter + @@queue_adapter = \ + case name_or_adapter + when Symbol, String + load_adapter(name_or_adapter) + when Class + name_or_adapter + end end private diff --git a/lib/active_job/queue_adapters/que_adapter.rb b/lib/active_job/queue_adapters/que_adapter.rb new file mode 100644 index 0000000000..6750882b91 --- /dev/null +++ b/lib/active_job/queue_adapters/que_adapter.rb @@ -0,0 +1,19 @@ +require 'que' + +module ActiveJob + module QueueAdapters + class QueAdapter + class << self + def queue(job, *args) + JobWrapper.enqueue job, *args, queue: job.queue_name + end + end + + class JobWrapper < Que::Job + def run(job, *args) + job.new.perform *Parameters.deserialize(args) + end + end + end + end +end diff --git a/lib/active_job/queue_adapters/queue_classic_adapter.rb b/lib/active_job/queue_adapters/queue_classic_adapter.rb index 38c04ca5c9..d0e2e1aa22 100644 --- a/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -5,8 +5,7 @@ module ActiveJob class QueueClassicAdapter class << self def queue(job, *args) - qc_queue = QC::Queue.new(job.queue_name) - qc_queue.enqueue("ActiveJob::QueueAdapters::QueueClassicAdapter::JobWrapper.perform", job, *args) + QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job, *args) end def queue_at(job, timestamp, *args) diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb new file mode 100644 index 0000000000..c6dbfa75bf --- /dev/null +++ b/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -0,0 +1,23 @@ +require 'sneakers' + +module ActiveJob + module QueueAdapters + class SneakersAdapter + class << self + def queue(job, *args) + JobWrapper.enqueue([job, *args]) + end + end + + class JobWrapper + include Sneakers::Worker + + self.from_queue("queue", {}) + + def work(job, *args) + job.new.perform *Parameters.deserialize(args) + end + end + end + end +end diff --git a/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/lib/active_job/queue_adapters/sucker_punch_adapter.rb index 63019e37c0..e483c0844b 100644 --- a/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -22,8 +22,8 @@ module ActiveJob class JobWrapper include SuckerPunch::Job - def perform(job_name, *args) - job_name.new.perform *Parameters.deserialize(args) + def perform(job, *args) + job.new.perform *Parameters.deserialize(args) end def later(sec, job_name, *args) |