diff options
Diffstat (limited to 'activejob/lib/active_job/queue_adapters')
10 files changed, 276 insertions, 0 deletions
diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb new file mode 100644 index 0000000000..6fe2d4eb53 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb @@ -0,0 +1,25 @@ +require 'backburner' + +module ActiveJob + module QueueAdapters + class BackburnerAdapter + class << self + def enqueue(job, *args) + Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name + end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end + end + + class JobWrapper + class << self + def perform(job_name, *args) + job_name.constantize.new.execute(*args) + end + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb new file mode 100644 index 0000000000..a00569833a --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -0,0 +1,23 @@ +require 'delayed_job' + +module ActiveJob + module QueueAdapters + class DelayedJobAdapter + class << self + def enqueue(job, *args) + JobWrapper.new.delay(queue: job.queue_name).perform(job, *args) + end + + def enqueue_at(job, timestamp, *args) + JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job, *args) + end + end + + class JobWrapper + def perform(job, *args) + job.new.execute(*args) + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb new file mode 100644 index 0000000000..5805340fb0 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb @@ -0,0 +1,15 @@ +module ActiveJob + module QueueAdapters + class InlineAdapter + class << self + def enqueue(job, *args) + job.new.execute(*args) + end + + def enqueue_at(*) + raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at https://github.com/rails/activejob") + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb new file mode 100644 index 0000000000..5cb741c094 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb @@ -0,0 +1,30 @@ +require 'qu' + +module ActiveJob + module QueueAdapters + class QuAdapter + class << self + def enqueue(job, *args) + Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload| + payload.instance_variable_set(:@queue, job.queue_name) + end.push + end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end + end + + class JobWrapper < Qu::Job + def initialize(job_name, *args) + @job = job_name.constantize + @args = args + end + + def perform + @job.new.execute(*@args) + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb new file mode 100644 index 0000000000..0b87deb4e0 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -0,0 +1,23 @@ +require 'que' + +module ActiveJob + module QueueAdapters + class QueAdapter + class << self + def enqueue(job, *args) + JobWrapper.enqueue job.name, *args, queue: job.queue_name + end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end + end + + class JobWrapper < Que::Job + def run(job_name, *args) + job_name.constantize.new.execute(*args) + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb new file mode 100644 index 0000000000..d74f8cf90e --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -0,0 +1,25 @@ +require 'queue_classic' + +module ActiveJob + module QueueAdapters + class QueueClassicAdapter + class << self + def enqueue(job, *args) + QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args) + end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end + end + + class JobWrapper + class << self + def perform(job_name, *args) + job_name.constantize.new.execute(*args) + end + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb new file mode 100644 index 0000000000..da8212fc9b --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb @@ -0,0 +1,41 @@ +require 'resque' +require 'active_support/core_ext/enumerable' +require 'active_support/core_ext/array/access' + +begin + require 'resque-scheduler' +rescue LoadError + begin + require 'resque_scheduler' + rescue LoadError + false + end +end + +module ActiveJob + module QueueAdapters + class ResqueAdapter + class << self + def enqueue(job, *args) + Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args + end + + def enqueue_at(job, timestamp, *args) + unless Resque.respond_to?(:enqueue_at_with_queue) + raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \ + "resque-scheduler gem. Please add it to your Gemfile and run bundle install" + end + Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args + end + end + + class JobWrapper + class << self + def perform(job_name, *args) + job_name.constantize.new.execute(*args) + end + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb new file mode 100644 index 0000000000..3e20bec44c --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -0,0 +1,35 @@ +require 'sidekiq' + +module ActiveJob + module QueueAdapters + class SidekiqAdapter + class << self + def enqueue(job, *args) + #Sidekiq::Client does not support symbols as keys + Sidekiq::Client.push \ + 'class' => JobWrapper, + 'queue' => job.queue_name, + 'args' => [ job, *args ], + 'retry' => true + end + + def enqueue_at(job, timestamp, *args) + Sidekiq::Client.push \ + 'class' => JobWrapper, + 'queue' => job.queue_name, + 'args' => [ job, *args ], + 'retry' => true, + 'at' => timestamp + end + end + + class JobWrapper + include Sidekiq::Worker + + def perform(job_name, *args) + job_name.constantize.new.execute(*args) + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb new file mode 100644 index 0000000000..48b3df6a46 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -0,0 +1,34 @@ +require 'sneakers' +require 'thread' + +module ActiveJob + module QueueAdapters + class SneakersAdapter + @monitor = Monitor.new + + class << self + def enqueue(job, *args) + @monitor.synchronize do + JobWrapper.from_queue job.queue_name + JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ]) + end + end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end + end + + class JobWrapper + include Sneakers::Worker + from_queue 'active_jobs_default' + + def work(msg) + job_name, *args = ActiveSupport::JSON.decode(msg) + job_name.constantize.new.execute(*args) + ack! + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb new file mode 100644 index 0000000000..16f05744f3 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -0,0 +1,25 @@ +require 'sucker_punch' + +module ActiveJob + module QueueAdapters + class SuckerPunchAdapter + class << self + def enqueue(job, *args) + JobWrapper.new.async.perform job, *args + end + + def enqueue_at(job, timestamp, *args) + raise NotImplementedError + end + end + + class JobWrapper + include SuckerPunch::Job + + def perform(job, *args) + job.new.execute(*args) + end + end + end + end +end |