diff options
Diffstat (limited to 'activejob/lib/active_job/queue_adapters')
5 files changed, 34 insertions, 7 deletions
diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb new file mode 100644 index 0000000000..3fc27f56e7 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb @@ -0,0 +1,23 @@ +require 'active_job/async_job' + +module ActiveJob + module QueueAdapters + # == Active Job Async adapter + # + # When enqueueing jobs with the Async adapter the job will be executed + # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html]. + # + # To use +AsyncJob+ set the queue_adapter config to +:async+. + # + # Rails.application.config.active_job.queue_adapter = :async + class AsyncAdapter + def enqueue(job) #:nodoc: + ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name) + end + + def enqueue_at(job, timestamp) #:nodoc: + ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name) + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb index ac83da2b9c..0a785fad3b 100644 --- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -14,13 +14,13 @@ module ActiveJob # Rails.application.config.active_job.queue_adapter = :delayed_job class DelayedJobAdapter def enqueue(job) #:nodoc: - delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority) job.provider_job_id = delayed_job.id delayed_job end def enqueue_at(job, timestamp) #:nodoc: - delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp)) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp)) job.provider_job_id = delayed_job.id delayed_job end diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb index 90947aa98d..ab13689747 100644 --- a/activejob/lib/active_job/queue_adapters/que_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -16,13 +16,13 @@ module ActiveJob # Rails.application.config.active_job.queue_adapter = :que class QueAdapter def enqueue(job) #:nodoc: - que_job = JobWrapper.enqueue job.serialize + que_job = JobWrapper.enqueue job.serialize, priority: job.priority job.provider_job_id = que_job.attrs["job_id"] que_job end def enqueue_at(job, timestamp) #:nodoc: - que_job = JobWrapper.enqueue job.serialize, run_at: Time.at(timestamp) + que_job = JobWrapper.enqueue job.serialize, priority: job.priority, run_at: Time.at(timestamp) job.provider_job_id = que_job.attrs["job_id"] que_job end diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb index 059754a87f..0ee41407d8 100644 --- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -18,7 +18,9 @@ module ActiveJob # Rails.application.config.active_job.queue_adapter = :queue_classic class QueueClassicAdapter def enqueue(job) #:nodoc: - build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) + qc_job = build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) + job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash) + qc_job end def enqueue_at(job, timestamp) #:nodoc: @@ -28,7 +30,9 @@ module ActiveJob 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \ 'You can implement this yourself or you can use the queue_classic-later gem.' end - queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) + qc_job = queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) + job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash) + qc_job end # Builds a <tt>QC::Queue</tt> object to schedule jobs on. diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb index f102c6567e..d78bdecdcb 100644 --- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -1,5 +1,5 @@ require 'sneakers' -require 'thread' +require 'monitor' module ActiveJob module QueueAdapters |