aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/lib/active_job
diff options
context:
space:
mode:
authorCristian Bica <cristian.bica@gmail.com>2014-08-25 17:34:50 +0300
committerCristian Bica <cristian.bica@gmail.com>2014-09-03 23:01:46 +0300
commit1e237b4e44b7de564c7d6b331dd2f2243c4113fd (patch)
treec272b813a4968815026d86f6b47ab9839ce3ab03 /activejob/lib/active_job
parent5db4e7f0ec2957f8641d5af884bd39e31d795597 (diff)
downloadrails-1e237b4e44b7de564c7d6b331dd2f2243c4113fd.tar.gz
rails-1e237b4e44b7de564c7d6b331dd2f2243c4113fd.tar.bz2
rails-1e237b4e44b7de564c7d6b331dd2f2243c4113fd.zip
Active Job refactoring
Diffstat (limited to 'activejob/lib/active_job')
-rw-r--r--activejob/lib/active_job/base.rb4
-rw-r--r--activejob/lib/active_job/configured_job.rb18
-rw-r--r--activejob/lib/active_job/core.rb89
-rw-r--r--activejob/lib/active_job/enqueuing.rb98
-rw-r--r--activejob/lib/active_job/execution.rb32
-rw-r--r--activejob/lib/active_job/identifier.rb15
-rw-r--r--activejob/lib/active_job/logging.rb28
-rw-r--r--activejob/lib/active_job/queue_adapters/backburner_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/inline_adapter.rb4
-rw-r--r--activejob/lib/active_job/queue_adapters/qu_adapter.rb9
-rw-r--r--activejob/lib/active_job/queue_adapters/que_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/resque_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/sneakers_adapter.rb10
-rw-r--r--activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb10
-rw-r--r--activejob/lib/active_job/queue_adapters/test_adapter.rb16
-rw-r--r--activejob/lib/active_job/queue_name.rb23
19 files changed, 276 insertions, 152 deletions
diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb
index 1b54786303..a3bec1f827 100644
--- a/activejob/lib/active_job/base.rb
+++ b/activejob/lib/active_job/base.rb
@@ -1,19 +1,19 @@
+require 'active_job/core'
require 'active_job/queue_adapter'
require 'active_job/queue_name'
require 'active_job/enqueuing'
require 'active_job/execution'
require 'active_job/callbacks'
-require 'active_job/identifier'
require 'active_job/logging'
module ActiveJob
class Base
+ include Core
include QueueAdapter
include QueueName
include Enqueuing
include Execution
include Callbacks
- include Identifier
include Logging
ActiveSupport.run_load_hooks(:active_job, self)
diff --git a/activejob/lib/active_job/configured_job.rb b/activejob/lib/active_job/configured_job.rb
new file mode 100644
index 0000000000..61efc9b09e
--- /dev/null
+++ b/activejob/lib/active_job/configured_job.rb
@@ -0,0 +1,18 @@
+module ActiveJob
+ class ConfiguredJob #:nodoc:
+ def initialize(job_class, options={})
+ @options = options
+ @options[:in] = @options.delete(:wait) if @options[:wait]
+ @options[:at] = @options.delete(:wait_until) if @options[:wait_until]
+ @job_class = job_class
+ end
+
+ def perform_now(*args)
+ @job_class.new(*args).perform_now
+ end
+
+ def perform_later(*args)
+ @job_class.new(*args).enqueue @options
+ end
+ end
+end
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb
new file mode 100644
index 0000000000..b6dd03a0bc
--- /dev/null
+++ b/activejob/lib/active_job/core.rb
@@ -0,0 +1,89 @@
+module ActiveJob
+ module Core
+ extend ActiveSupport::Concern
+
+ included do
+ # Job arguments
+ attr_accessor :arguments
+ attr_writer :serialized_arguments
+
+ # Timestamp when the job should be performed
+ attr_accessor :scheduled_at
+
+ # Job Identifier
+ attr_accessor :job_id
+
+ # Queue on which the job should be run on.
+ attr_writer :queue_name
+ end
+
+ module ClassMethods
+ # Creates a new job instance from a hash created with +serialize+
+ def deserialize(job_data)
+ job = job_data['job_class'].constantize.new
+ job.job_id = job_data['job_id']
+ job.queue_name = job_data['queue_name']
+ job.serialized_arguments = job_data['arguments']
+ job
+ end
+
+ # Creates a job preconfigured with the given options. You can call
+ # perform_later with the job arguments to enqueue the job with the
+ # preconfigured options
+ #
+ # ==== Options
+ # * <tt>:wait</tt> - Enqueues the job with the specified delay
+ # * <tt>:wait_until</tt> - Enqueues the job at the time specified
+ # * <tt>:queue</tt> - Enqueues the job on the specified queue
+ #
+ # ==== Examples
+ #
+ # VideoJob.set(queue: :some_queue).perform_later(Video.last)
+ # VideoJob.set(wait: 5.minutes).perform_later(Video.last)
+ # VideoJob.set(wait_until: Time.tomorroe).perform_later(Video.last)
+ # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
+ # VideoJob.set(queue: :some_queue, wait_until: Time.tomorroe).perform_later(Video.last)
+ def set(options={})
+ ConfiguredJob.new(self, options)
+ end
+ end
+
+ # Creates a new job instance. Takes as arguments the arguments that
+ # will be passed to the perform method.
+ def initialize(*arguments)
+ @arguments = arguments
+ @job_id = SecureRandom.uuid
+ @queue_name = self.class.queue_name
+ end
+
+ # Returns a hash with the job data that can safely be passed to the
+ # queueing adapter.
+ def serialize
+ {
+ 'job_class' => self.class.name,
+ 'job_id' => job_id,
+ 'queue_name' => queue_name,
+ 'arguments' => serialize_arguments(arguments)
+ }
+ end
+
+ private
+ def deserialize_arguments_if_needed
+ if defined?(@serialized_arguments) && @serialized_arguments.present?
+ @arguments = deserialize_arguments(@serialized_arguments)
+ @serialized_arguments = nil
+ end
+ end
+
+ def serialize_arguments(serialized_args)
+ Arguments.serialize(serialized_args)
+ end
+
+ def deserialize_arguments(serialized_args)
+ Arguments.deserialize(serialized_args)
+ end
+ end
+end
+
+
+
diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb
index 3d00d51867..32453930cc 100644
--- a/activejob/lib/active_job/enqueuing.rb
+++ b/activejob/lib/active_job/enqueuing.rb
@@ -12,60 +12,64 @@ module ActiveJob
#
# Returns an instance of the job class queued with args available in
# Job#arguments.
- def enqueue(*args)
- new(args).tap do |job|
- job.run_callbacks :enqueue do
- queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args)
- end
- end
- end
-
- # Enqueue a job to be performed at +interval+ from now.
- #
- # enqueue_in(1.week, "mike")
- #
- # Returns an instance of the job class queued with args available in
- # Job#arguments and the timestamp in Job#enqueue_at.
- def enqueue_in(interval, *args)
- enqueue_at interval.seconds.from_now, *args
+ def perform_later(*args)
+ job_or_instantiate(*args).enqueue
end
- # Enqueue a job to be performed at an explicit point in time.
- #
- # enqueue_at(Date.tomorrow.midnight, "mike")
- #
- # Returns an instance of the job class queued with args available in
- # Job#arguments and the timestamp in Job#enqueue_at.
- def enqueue_at(timestamp, *args)
- new(args).tap do |job|
- job.enqueued_at = timestamp
-
- job.run_callbacks :enqueue do
- queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args)
- end
+ protected
+ def job_or_instantiate(*args)
+ args.first.is_a?(self) ? args.first : new(*args)
end
- end
- end
-
- included do
- attr_accessor :arguments
- attr_accessor :enqueued_at
- end
-
- def initialize(arguments = nil)
- @arguments = arguments
end
- def retry_now
- self.class.enqueue(*arguments)
+ # Reschedule the job to be re-executed. This is usefull in combination
+ # with the +rescue_from+ option. When you rescue an exception from your job
+ # you can ask Active Job to retry performing your job.
+ #
+ # ==== Options
+ # * <tt>:in</tt> - Enqueues the job with the specified delay
+ # * <tt>:at</tt> - Enqueues the job at the time specified
+ # * <tt>:queue</tt> - Enqueues the job on the specified queue
+ #
+ # ==== Examples
+ #
+ # class SiteScrapperJob < ActiveJob::Base
+ # rescue_from(ErrorLoadingSite) do
+ # retry_job queue: :low_priority
+ # end
+ # def perform(*args)
+ # # raise ErrorLoadingSite if cannot scrape
+ # end
+ # end
+ def retry_job(options={})
+ enqueue options
end
- def retry_in(interval)
- self.class.enqueue_in interval, *arguments
- end
-
- def retry_at(timestamp)
- self.class.enqueue_at timestamp, *arguments
+ # Equeue the job to be performed by the queue adapter.
+ #
+ # ==== Options
+ # * <tt>:in</tt> - Enqueues the job with the specified delay
+ # * <tt>:at</tt> - Enqueues the job at the time specified
+ # * <tt>:queue</tt> - Enqueues the job on the specified queue
+ #
+ # ==== Examples
+ #
+ # my_job_instance.enqueue
+ # my_job_instance.enqueue in: 5.minutes
+ # my_job_instance.enqueue queue: :important
+ # my_job_instance.enqueue at: Date.tomorrow.midnight
+ def enqueue(options={})
+ self.scheduled_at = options[:in].seconds.from_now.to_f if options[:in]
+ self.scheduled_at = options[:at].to_f if options[:at]
+ self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
+ run_callbacks :enqueue do
+ if self.scheduled_at
+ self.class.queue_adapter.enqueue_at self, self.scheduled_at
+ else
+ self.class.queue_adapter.enqueue self
+ end
+ end
+ self
end
end
end
diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb
index 0e7b5bdd72..d6d67c46e3 100644
--- a/activejob/lib/active_job/execution.rb
+++ b/activejob/lib/active_job/execution.rb
@@ -4,15 +4,29 @@ require 'active_job/arguments'
module ActiveJob
module Execution
extend ActiveSupport::Concern
+ include ActiveSupport::Rescuable
- included do
- include ActiveSupport::Rescuable
- end
+ module ClassMethods
+ # Performs the job immediately.
+ #
+ # MyJob.perform_now("mike")
+ #
+ def perform_now(*args)
+ job_or_instantiate(*args).perform_now
+ end
- def execute(job_id, *serialized_args)
- self.job_id = job_id
- self.arguments = deserialize_arguments(serialized_args)
+ def execute(job_data) #:nodoc:
+ job = deserialize(job_data)
+ job.perform_now
+ end
+ end
+ # Performs the job immediately. The job is not sent to the queueing adapter
+ # and will block the execution until it's finished.
+ #
+ # MyJob.new(*args).perform_now
+ def perform_now
+ deserialize_arguments_if_needed
run_callbacks :perform do
perform(*arguments)
end
@@ -23,11 +37,5 @@ module ActiveJob
def perform(*)
fail NotImplementedError
end
-
- private
- def deserialize_arguments(serialized_args)
- Arguments.deserialize(serialized_args)
- end
-
end
end
diff --git a/activejob/lib/active_job/identifier.rb b/activejob/lib/active_job/identifier.rb
deleted file mode 100644
index c7f522087d..0000000000
--- a/activejob/lib/active_job/identifier.rb
+++ /dev/null
@@ -1,15 +0,0 @@
-require 'active_job/arguments'
-
-module ActiveJob
- module Identifier
- extend ActiveSupport::Concern
-
- included do
- attr_writer :job_id
- end
-
- def job_id
- @job_id ||= SecureRandom.uuid
- end
- end
-end
diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb
index ae098a80f3..d2030773bb 100644
--- a/activejob/lib/active_job/logging.rb
+++ b/activejob/lib/active_job/logging.rb
@@ -17,7 +17,7 @@ module ActiveJob
around_perform do |job, block, _|
tag_logger(job.class.name, job.job_id) do
- payload = {adapter: job.class.queue_adapter, job: job.class, args: job.arguments}
+ payload = {adapter: job.class.queue_adapter, job: job}
ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup)
ActiveSupport::Notifications.instrument("perform.active_job", payload) do
block.call
@@ -26,12 +26,12 @@ module ActiveJob
end
before_enqueue do |job|
- if job.enqueued_at
+ if job.scheduled_at
ActiveSupport::Notifications.instrument "enqueue_at.active_job",
- adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at
+ adapter: job.class.queue_adapter, job: job
else
ActiveSupport::Notifications.instrument "enqueue.active_job",
- adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments
+ adapter: job.class.queue_adapter, job: job
end
end
end
@@ -52,19 +52,23 @@ module ActiveJob
class LogSubscriber < ActiveSupport::LogSubscriber
def enqueue(event)
- info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event) }
+ job = event.payload[:job]
+ info { "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job) }
end
def enqueue_at(event)
- info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) }
+ job = event.payload[:job]
+ info { "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job) }
end
def perform_start(event)
- info { "Performing #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) }
+ job = event.payload[:job]
+ info { "Performing #{job.class.name} from #{queue_name(event)}" + args_info(job) }
end
def perform(event)
- info { "Performed #{event.payload[:job].name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" }
+ job = event.payload[:job]
+ info { "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" }
end
private
@@ -72,12 +76,12 @@ module ActiveJob
event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})"
end
- def args_info(event)
- event.payload[:args].any? ? " with arguments: #{event.payload[:args].map(&:inspect).join(", ")}" : ""
+ def args_info(job)
+ job.arguments.any? ? " with arguments: #{job.arguments.map(&:inspect).join(", ")}" : ""
end
- def enqueued_at(event)
- Time.at(event.payload[:timestamp]).utc
+ def scheduled_at(event)
+ Time.at(event.payload[:job].scheduled_at).utc
end
def logger
diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
index 8ebee36e45..e1b00f1de7 100644
--- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
@@ -4,20 +4,20 @@ module ActiveJob
module QueueAdapters
class BackburnerAdapter
class << self
- def enqueue(job, *args)
- Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name
+ def enqueue(job)
+ Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
delay = timestamp - Time.current.to_f
- Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name, delay: delay
+ Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay
end
end
class JobWrapper
class << self
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
index a00569833a..658799edfc 100644
--- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
@@ -4,18 +4,18 @@ module ActiveJob
module QueueAdapters
class DelayedJobAdapter
class << self
- def enqueue(job, *args)
- JobWrapper.new.delay(queue: job.queue_name).perform(job, *args)
+ def enqueue(job)
+ JobWrapper.new.delay(queue: job.queue_name).perform(job.serialize)
end
- def enqueue_at(job, timestamp, *args)
- JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job, *args)
+ def enqueue_at(job, timestamp)
+ JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job.serialize)
end
end
class JobWrapper
- def perform(job, *args)
- job.new.execute(*args)
+ def perform(job_data)
+ Base.execute(job_data)
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
index 5805340fb0..fdefa38d5e 100644
--- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
@@ -2,8 +2,8 @@ module ActiveJob
module QueueAdapters
class InlineAdapter
class << self
- def enqueue(job, *args)
- job.new.execute(*args)
+ def enqueue(job)
+ Base.execute(job.serialize)
end
def enqueue_at(*)
diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb
index 5cb741c094..f681fd7e8a 100644
--- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb
@@ -5,7 +5,7 @@ module ActiveJob
class QuAdapter
class << self
def enqueue(job, *args)
- Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload|
+ Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload|
payload.instance_variable_set(:@queue, job.queue_name)
end.push
end
@@ -16,13 +16,12 @@ module ActiveJob
end
class JobWrapper < Qu::Job
- def initialize(job_name, *args)
- @job = job_name.constantize
- @args = args
+ def initialize(job_data)
+ @job_data = job_data
end
def perform
- @job.new.execute(*@args)
+ Base.execute @job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb
index 9c84c74f83..51891ab07b 100644
--- a/activejob/lib/active_job/queue_adapters/que_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb
@@ -4,18 +4,18 @@ module ActiveJob
module QueueAdapters
class QueAdapter
class << self
- def enqueue(job, *args)
- JobWrapper.enqueue job.name, *args, queue: job.queue_name
+ def enqueue(job)
+ JobWrapper.enqueue job.serialize, queue: job.queue_name
end
- def enqueue_at(job, timestamp, *args)
- JobWrapper.enqueue job.name, *args, queue: job.queue_name, run_at: Time.at(timestamp)
+ def enqueue_at(job, timestamp)
+ JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp)
end
end
class JobWrapper < Que::Job
- def run(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def run(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
index 914390a958..ddcc868317 100644
--- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
@@ -4,18 +4,18 @@ module ActiveJob
module QueueAdapters
class QueueClassicAdapter
class << self
- def enqueue(job, *args)
- build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args)
+ def enqueue(job)
+ build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
queue = build_queue(job.queue_name)
unless queue.respond_to?(:enqueue_at)
raise NotImplementedError, 'To be able to schedule jobs with Queue Classic ' \
'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. '
'You can implement this yourself or you can use the queue_classic-later gem.'
end
- queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.name, *args)
+ queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
end
# Builds a <tt>QC::Queue</tt> object to schedule jobs on.
@@ -30,8 +30,8 @@ module ActiveJob
class JobWrapper
class << self
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
index da8212fc9b..affa3bdfbc 100644
--- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
@@ -16,23 +16,23 @@ module ActiveJob
module QueueAdapters
class ResqueAdapter
class << self
- def enqueue(job, *args)
- Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args
+ def enqueue(job)
+ Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
unless Resque.respond_to?(:enqueue_at_with_queue)
raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \
"resque-scheduler gem. Please add it to your Gemfile and run bundle install"
end
- Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args
+ Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize
end
end
class JobWrapper
class << self
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
index 3e20bec44c..79926a1e24 100644
--- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
@@ -4,20 +4,20 @@ module ActiveJob
module QueueAdapters
class SidekiqAdapter
class << self
- def enqueue(job, *args)
+ def enqueue(job)
#Sidekiq::Client does not support symbols as keys
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
- 'args' => [ job, *args ],
+ 'args' => [ job.serialize ],
'retry' => true
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
- 'args' => [ job, *args ],
+ 'args' => [ job.serialize ],
'retry' => true,
'at' => timestamp
end
@@ -26,8 +26,8 @@ module ActiveJob
class JobWrapper
include Sidekiq::Worker
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
index 48b3df6a46..60633639f4 100644
--- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
@@ -7,14 +7,14 @@ module ActiveJob
@monitor = Monitor.new
class << self
- def enqueue(job, *args)
+ def enqueue(job)
@monitor.synchronize do
JobWrapper.from_queue job.queue_name
- JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ])
+ JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
end
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
raise NotImplementedError
end
end
@@ -24,8 +24,8 @@ module ActiveJob
from_queue 'active_jobs_default'
def work(msg)
- job_name, *args = ActiveSupport::JSON.decode(msg)
- job_name.constantize.new.execute(*args)
+ job_data = ActiveSupport::JSON.decode(msg)
+ Base.execute job_data
ack!
end
end
diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
index 16f05744f3..b19a38093f 100644
--- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
@@ -4,11 +4,11 @@ module ActiveJob
module QueueAdapters
class SuckerPunchAdapter
class << self
- def enqueue(job, *args)
- JobWrapper.new.async.perform job, *args
+ def enqueue(job)
+ JobWrapper.new.async.perform job.serialize
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
raise NotImplementedError
end
end
@@ -16,8 +16,8 @@ module ActiveJob
class JobWrapper
include SuckerPunch::Job
- def perform(job, *args)
- job.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb
index 185d6fc7e6..b9997efddf 100644
--- a/activejob/lib/active_job/queue_adapters/test_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb
@@ -45,21 +45,21 @@ module ActiveJob
@performed_jobs = val
end
- def enqueue(job, *args)
+ def enqueue(job)
if perform_enqueued_jobs?
- performed_jobs << {job: job, args: args, queue: job.queue_name}
- job.new.execute(*args)
+ performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name}
+ job.perform_now
else
- enqueued_jobs << {job: job, args: args, queue: job.queue_name}
+ enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name}
end
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
if perform_enqueued_at_jobs?
- performed_jobs << {job: job, args: args, queue: job.queue_name, run_at: timestamp}
- job.new.execute(*args)
+ performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp}
+ job.perform_now
else
- enqueued_jobs << {job: job, args: args, queue: job.queue_name, run_at: timestamp}
+ enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp}
end
end
diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb
index 9698835b6e..45acb71605 100644
--- a/activejob/lib/active_job/queue_name.rb
+++ b/activejob/lib/active_job/queue_name.rb
@@ -6,16 +6,33 @@ module ActiveJob
mattr_accessor(:queue_name_prefix)
mattr_accessor(:default_queue_name) { "default" }
- def queue_as(part_name)
+ def queue_as(part_name=nil, &block)
+ if block_given?
+ self.queue_name = block
+ else
+ self.queue_name = queue_name_from_part(part_name)
+ end
+ end
+
+ def queue_name_from_part(part_name) #:nodoc:
queue_name = part_name.to_s.presence || default_queue_name
name_parts = [queue_name_prefix.presence, queue_name]
- self.queue_name = name_parts.compact.join('_')
+ name_parts.compact.join('_')
end
end
included do
- class_attribute :queue_name
+ class_attribute :queue_name, instance_accessor: false
self.queue_name = default_queue_name
end
+
+ # Returns the name of the queue the job will be run on
+ def queue_name
+ if @queue_name.is_a?(Proc)
+ @queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name))
+ end
+ @queue_name
+ end
+
end
end