aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/lib/active_job/queue_adapters
diff options
context:
space:
mode:
authorCristian Bica <cristian.bica@gmail.com>2014-08-25 17:34:50 +0300
committerCristian Bica <cristian.bica@gmail.com>2014-09-03 23:01:46 +0300
commit1e237b4e44b7de564c7d6b331dd2f2243c4113fd (patch)
treec272b813a4968815026d86f6b47ab9839ce3ab03 /activejob/lib/active_job/queue_adapters
parent5db4e7f0ec2957f8641d5af884bd39e31d795597 (diff)
downloadrails-1e237b4e44b7de564c7d6b331dd2f2243c4113fd.tar.gz
rails-1e237b4e44b7de564c7d6b331dd2f2243c4113fd.tar.bz2
rails-1e237b4e44b7de564c7d6b331dd2f2243c4113fd.zip
Active Job refactoring
Diffstat (limited to 'activejob/lib/active_job/queue_adapters')
-rw-r--r--activejob/lib/active_job/queue_adapters/backburner_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/inline_adapter.rb4
-rw-r--r--activejob/lib/active_job/queue_adapters/qu_adapter.rb9
-rw-r--r--activejob/lib/active_job/queue_adapters/que_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/resque_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/sneakers_adapter.rb10
-rw-r--r--activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb10
-rw-r--r--activejob/lib/active_job/queue_adapters/test_adapter.rb16
11 files changed, 60 insertions, 61 deletions
diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
index 8ebee36e45..e1b00f1de7 100644
--- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
@@ -4,20 +4,20 @@ module ActiveJob
module QueueAdapters
class BackburnerAdapter
class << self
- def enqueue(job, *args)
- Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name
+ def enqueue(job)
+ Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
delay = timestamp - Time.current.to_f
- Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name, delay: delay
+ Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay
end
end
class JobWrapper
class << self
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
index a00569833a..658799edfc 100644
--- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
@@ -4,18 +4,18 @@ module ActiveJob
module QueueAdapters
class DelayedJobAdapter
class << self
- def enqueue(job, *args)
- JobWrapper.new.delay(queue: job.queue_name).perform(job, *args)
+ def enqueue(job)
+ JobWrapper.new.delay(queue: job.queue_name).perform(job.serialize)
end
- def enqueue_at(job, timestamp, *args)
- JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job, *args)
+ def enqueue_at(job, timestamp)
+ JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job.serialize)
end
end
class JobWrapper
- def perform(job, *args)
- job.new.execute(*args)
+ def perform(job_data)
+ Base.execute(job_data)
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
index 5805340fb0..fdefa38d5e 100644
--- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
@@ -2,8 +2,8 @@ module ActiveJob
module QueueAdapters
class InlineAdapter
class << self
- def enqueue(job, *args)
- job.new.execute(*args)
+ def enqueue(job)
+ Base.execute(job.serialize)
end
def enqueue_at(*)
diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb
index 5cb741c094..f681fd7e8a 100644
--- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb
@@ -5,7 +5,7 @@ module ActiveJob
class QuAdapter
class << self
def enqueue(job, *args)
- Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload|
+ Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload|
payload.instance_variable_set(:@queue, job.queue_name)
end.push
end
@@ -16,13 +16,12 @@ module ActiveJob
end
class JobWrapper < Qu::Job
- def initialize(job_name, *args)
- @job = job_name.constantize
- @args = args
+ def initialize(job_data)
+ @job_data = job_data
end
def perform
- @job.new.execute(*@args)
+ Base.execute @job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb
index 9c84c74f83..51891ab07b 100644
--- a/activejob/lib/active_job/queue_adapters/que_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb
@@ -4,18 +4,18 @@ module ActiveJob
module QueueAdapters
class QueAdapter
class << self
- def enqueue(job, *args)
- JobWrapper.enqueue job.name, *args, queue: job.queue_name
+ def enqueue(job)
+ JobWrapper.enqueue job.serialize, queue: job.queue_name
end
- def enqueue_at(job, timestamp, *args)
- JobWrapper.enqueue job.name, *args, queue: job.queue_name, run_at: Time.at(timestamp)
+ def enqueue_at(job, timestamp)
+ JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp)
end
end
class JobWrapper < Que::Job
- def run(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def run(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
index 914390a958..ddcc868317 100644
--- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
@@ -4,18 +4,18 @@ module ActiveJob
module QueueAdapters
class QueueClassicAdapter
class << self
- def enqueue(job, *args)
- build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args)
+ def enqueue(job)
+ build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
queue = build_queue(job.queue_name)
unless queue.respond_to?(:enqueue_at)
raise NotImplementedError, 'To be able to schedule jobs with Queue Classic ' \
'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. '
'You can implement this yourself or you can use the queue_classic-later gem.'
end
- queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.name, *args)
+ queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
end
# Builds a <tt>QC::Queue</tt> object to schedule jobs on.
@@ -30,8 +30,8 @@ module ActiveJob
class JobWrapper
class << self
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
index da8212fc9b..affa3bdfbc 100644
--- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
@@ -16,23 +16,23 @@ module ActiveJob
module QueueAdapters
class ResqueAdapter
class << self
- def enqueue(job, *args)
- Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args
+ def enqueue(job)
+ Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
unless Resque.respond_to?(:enqueue_at_with_queue)
raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \
"resque-scheduler gem. Please add it to your Gemfile and run bundle install"
end
- Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args
+ Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize
end
end
class JobWrapper
class << self
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
index 3e20bec44c..79926a1e24 100644
--- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
@@ -4,20 +4,20 @@ module ActiveJob
module QueueAdapters
class SidekiqAdapter
class << self
- def enqueue(job, *args)
+ def enqueue(job)
#Sidekiq::Client does not support symbols as keys
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
- 'args' => [ job, *args ],
+ 'args' => [ job.serialize ],
'retry' => true
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
- 'args' => [ job, *args ],
+ 'args' => [ job.serialize ],
'retry' => true,
'at' => timestamp
end
@@ -26,8 +26,8 @@ module ActiveJob
class JobWrapper
include Sidekiq::Worker
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
index 48b3df6a46..60633639f4 100644
--- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
@@ -7,14 +7,14 @@ module ActiveJob
@monitor = Monitor.new
class << self
- def enqueue(job, *args)
+ def enqueue(job)
@monitor.synchronize do
JobWrapper.from_queue job.queue_name
- JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ])
+ JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
end
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
raise NotImplementedError
end
end
@@ -24,8 +24,8 @@ module ActiveJob
from_queue 'active_jobs_default'
def work(msg)
- job_name, *args = ActiveSupport::JSON.decode(msg)
- job_name.constantize.new.execute(*args)
+ job_data = ActiveSupport::JSON.decode(msg)
+ Base.execute job_data
ack!
end
end
diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
index 16f05744f3..b19a38093f 100644
--- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
@@ -4,11 +4,11 @@ module ActiveJob
module QueueAdapters
class SuckerPunchAdapter
class << self
- def enqueue(job, *args)
- JobWrapper.new.async.perform job, *args
+ def enqueue(job)
+ JobWrapper.new.async.perform job.serialize
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
raise NotImplementedError
end
end
@@ -16,8 +16,8 @@ module ActiveJob
class JobWrapper
include SuckerPunch::Job
- def perform(job, *args)
- job.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb
index 185d6fc7e6..b9997efddf 100644
--- a/activejob/lib/active_job/queue_adapters/test_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb
@@ -45,21 +45,21 @@ module ActiveJob
@performed_jobs = val
end
- def enqueue(job, *args)
+ def enqueue(job)
if perform_enqueued_jobs?
- performed_jobs << {job: job, args: args, queue: job.queue_name}
- job.new.execute(*args)
+ performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name}
+ job.perform_now
else
- enqueued_jobs << {job: job, args: args, queue: job.queue_name}
+ enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name}
end
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp)
if perform_enqueued_at_jobs?
- performed_jobs << {job: job, args: args, queue: job.queue_name, run_at: timestamp}
- job.new.execute(*args)
+ performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp}
+ job.perform_now
else
- enqueued_jobs << {job: job, args: args, queue: job.queue_name, run_at: timestamp}
+ enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp}
end
end