From c813a30c5a3031108bdd41b57571803c13f95569 Mon Sep 17 00:00:00 2001 From: Mike Perham <mperham@gmail.com> Date: Mon, 19 May 2014 21:08:18 -0700 Subject: Reimplement Sidekiq worker This better integrates various Sidekiq features into AJ jobs. Things like the JID will be set as expected and the user can use `sidekiq_options` in AJ::Base subclasses as usual to configure various features. --- lib/active_job/queue_adapters/sidekiq_adapter.rb | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'lib/active_job') diff --git a/lib/active_job/queue_adapters/sidekiq_adapter.rb b/lib/active_job/queue_adapters/sidekiq_adapter.rb index c8fac32963..087e833d24 100644 --- a/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -5,7 +5,8 @@ module ActiveJob class SidekiqAdapter class << self def queue(job, *args) - JobWrapper.client_push class: JobWrapper, queue: job.queue_name, args: [ job, *args ] + item = { 'class' => JobWrapper, 'queue' => job.queue_name, 'args' => [job, *args] } + Sidekiq::Client.push(job.get_sidekiq_options.merge(item)) end end @@ -13,9 +14,15 @@ module ActiveJob include Sidekiq::Worker def perform(job_name, *args) - job_name.constantize.new.perform *Parameters.deserialize(args) + instance = job_name.constantize.new + instance.jid = self.jid + instance.perform *Parameters.deserialize(args) end end end end end + +class ActiveJob::Base + include Sidekiq::Worker +end -- cgit v1.2.3 From 243afc061cf527bebf6c76956c53c5cb637876c6 Mon Sep 17 00:00:00 2001 From: Dimitar Kostov <mitko.kostov@gmail.com> Date: Tue, 20 May 2014 14:03:46 +0300 Subject: Adds backburner adapter --- lib/active_job/queue_adapters/backburner_adapter.rb | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 lib/active_job/queue_adapters/backburner_adapter.rb (limited to 'lib/active_job') diff --git a/lib/active_job/queue_adapters/backburner_adapter.rb b/lib/active_job/queue_adapters/backburner_adapter.rb new file mode 100644 index 0000000000..0ac745c7f2 --- /dev/null +++ b/lib/active_job/queue_adapters/backburner_adapter.rb @@ -0,0 +1,21 @@ +require 'backburner' + +module ActiveJob + module QueueAdapters + class BackburnerAdapter + class << self + def queue(job, *args) + Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name + end + end + + class JobWrapper + class << self + def perform(job_name, *args) + job_name.constantize.new.perform *Parameters.deserialize(args) + end + end + end + end + end +end -- cgit v1.2.3 From 42f5ba37a4bf3cd9369643e479b91a5f97bce779 Mon Sep 17 00:00:00 2001 From: Mike Perham <mperham@gmail.com> Date: Tue, 20 May 2014 06:57:29 -0700 Subject: Remove all Sidekiq-specific stuff from job, enable retries by default --- lib/active_job/queue_adapters/sidekiq_adapter.rb | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'lib/active_job') diff --git a/lib/active_job/queue_adapters/sidekiq_adapter.rb b/lib/active_job/queue_adapters/sidekiq_adapter.rb index 087e833d24..0cf9e47287 100644 --- a/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -5,8 +5,11 @@ module ActiveJob class SidekiqAdapter class << self def queue(job, *args) - item = { 'class' => JobWrapper, 'queue' => job.queue_name, 'args' => [job, *args] } - Sidekiq::Client.push(job.get_sidekiq_options.merge(item)) + item = { 'class' => JobWrapper, + 'queue' => job.queue_name, + 'args' => [job, *args], + 'retry' => true } + Sidekiq::Client.push(item) end end @@ -15,14 +18,9 @@ module ActiveJob def perform(job_name, *args) instance = job_name.constantize.new - instance.jid = self.jid instance.perform *Parameters.deserialize(args) end end end end end - -class ActiveJob::Base - include Sidekiq::Worker -end -- cgit v1.2.3 From cbfc8b367ed04fdbb748d0712c33b0b62d3de331 Mon Sep 17 00:00:00 2001 From: Douwe Maan <douwe@selenight.nl> Date: Tue, 20 May 2014 11:32:29 +0200 Subject: Have Sneakers adapter take queue_name into account. --- lib/active_job/queue_adapters/sneakers_adapter.rb | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'lib/active_job') diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb index c6dbfa75bf..7be6b2a085 100644 --- a/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -1,19 +1,23 @@ require 'sneakers' +require 'thread' module ActiveJob module QueueAdapters class SneakersAdapter + @mutex = Mutex.new + class << self def queue(job, *args) - JobWrapper.enqueue([job, *args]) + @mutex.synchronize do + JobWrapper.from_queue job.queue_name + JobWrapper.enqueue [ job, *args ] + end end end class JobWrapper include Sneakers::Worker - self.from_queue("queue", {}) - def work(job, *args) job.new.perform *Parameters.deserialize(args) end -- cgit v1.2.3 From 1d519aae29228186d3e82906d5d6e8dda2e91bd3 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson <david@loudthinking.com> Date: Tue, 20 May 2014 17:36:17 +0200 Subject: Reformat the logging line and ensure we are logging the serialized args --- lib/active_job/enqueuing.rb | 5 +++-- lib/active_job/log_subscriber.rb | 11 ++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) (limited to 'lib/active_job') diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb index 46f703481a..6fb6f15ce2 100644 --- a/lib/active_job/enqueuing.rb +++ b/lib/active_job/enqueuing.rb @@ -11,8 +11,9 @@ module ActiveJob # The return value is adapter-specific and may change in a future # ActiveJob release. def enqueue(*args) - ActiveSupport::Notifications.instrument "enqueue.active_job", adapter: queue_adapter, job: self, params: args - queue_adapter.queue self, *Parameters.serialize(args) + serialized_args = Parameters.serialize(args) + ActiveSupport::Notifications.instrument "enqueue.active_job", adapter: queue_adapter, job: self, args: serialized_args + queue_adapter.queue self, *serialized_args end end end diff --git a/lib/active_job/log_subscriber.rb b/lib/active_job/log_subscriber.rb index 31c61a6068..472c9f3081 100644 --- a/lib/active_job/log_subscriber.rb +++ b/lib/active_job/log_subscriber.rb @@ -1,12 +1,13 @@ +require 'active_support/core_ext/string/filters' + module ActiveJob class LogSubscriber < ActiveSupport::LogSubscriber def enqueue(event) - payload = event.payload - params = payload[:params] - adapter = payload[:adapter] - job = payload[:job] + queue_name = event.payload[:adapter].name.demodulize.remove('Adapter') + job_name = event.payload[:job].name + args = event.payload[:args].any? ? ": #{event.payload[:args].inspect}" : "" - info "ActiveJob enqueued to #{adapter.name.demodulize} job #{job.name}: #{params.inspect}" + info "Enqueued #{job_name} to #{queue_name}" + args end def logger -- cgit v1.2.3 From b13a3cb29b7b8823a085977dc6de801fb65acf71 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson <david@loudthinking.com> Date: Tue, 20 May 2014 17:37:10 +0200 Subject: Styling --- lib/active_job/queue_adapters/sidekiq_adapter.rb | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'lib/active_job') diff --git a/lib/active_job/queue_adapters/sidekiq_adapter.rb b/lib/active_job/queue_adapters/sidekiq_adapter.rb index 0cf9e47287..43bd69790c 100644 --- a/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -5,11 +5,11 @@ module ActiveJob class SidekiqAdapter class << self def queue(job, *args) - item = { 'class' => JobWrapper, - 'queue' => job.queue_name, - 'args' => [job, *args], - 'retry' => true } - Sidekiq::Client.push(item) + Sidekiq::Client.push \ + 'class' => JobWrapper, + 'queue' => job.queue_name, + 'args' => [ job, *args ], + 'retry' => true end end @@ -17,8 +17,7 @@ module ActiveJob include Sidekiq::Worker def perform(job_name, *args) - instance = job_name.constantize.new - instance.perform *Parameters.deserialize(args) + job_name.constantize.new.perform *Parameters.deserialize(args) end end end -- cgit v1.2.3