From 243d74eb30464dc95cb07c0bd14cc086f9cd7022 Mon Sep 17 00:00:00 2001 From: Cristian Bica Date: Sat, 31 May 2014 02:19:30 +0300 Subject: Persist job_id --- lib/active_job/enqueuing.rb | 18 +++++++++--------- lib/active_job/execution.rb | 5 +++-- lib/active_job/logging.rb | 10 +++++----- lib/active_job/queue_adapters/resque_adapter.rb | 12 ++---------- lib/activejob.rb | 1 + test/cases/callbacks_test.rb | 2 +- test/cases/logging_test.rb | 10 +++++----- test/cases/rescue_test.rb | 8 +++----- 8 files changed, 29 insertions(+), 37 deletions(-) create mode 100644 lib/activejob.rb diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb index 0f094fb624..e3ac11ba97 100644 --- a/lib/active_job/enqueuing.rb +++ b/lib/active_job/enqueuing.rb @@ -3,19 +3,19 @@ require 'active_job/arguments' module ActiveJob module Enqueuing extend ActiveSupport::Concern - + module ClassMethods # Push a job onto the queue. The arguments must be legal JSON types # (string, int, float, nil, true, false, hash or array) or # ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects # are not supported. # - # Returns an instance of the job class queued with args available in + # Returns an instance of the job class queued with args available in # Job#arguments. def enqueue(*args) new(args).tap do |job| job.run_callbacks :enqueue do - queue_adapter.enqueue self, *Arguments.serialize(args) + queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args) end end end @@ -24,7 +24,7 @@ module ActiveJob # # enqueue_in(1.week, "mike") # - # Returns an instance of the job class queued with args available in + # Returns an instance of the job class queued with args available in # Job#arguments and the timestamp in Job#enqueue_at. def enqueue_in(interval, *args) enqueue_at interval.seconds.from_now, *args @@ -34,19 +34,19 @@ module ActiveJob # # enqueue_at(Date.tomorrow.midnight, "mike") # - # Returns an instance of the job class queued with args available in + # Returns an instance of the job class queued with args available in # Job#arguments and the timestamp in Job#enqueue_at. def enqueue_at(timestamp, *args) new(args).tap do |job| job.enqueued_at = timestamp job.run_callbacks :enqueue do - queue_adapter.enqueue_at self, timestamp.to_f, *Arguments.serialize(args) + queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args) end end end end - + included do attr_accessor :arguments attr_accessor :enqueued_at @@ -55,11 +55,11 @@ module ActiveJob def initialize(arguments = nil) @arguments = arguments end - + def retry_now self.class.enqueue *arguments end - + def retry_in(interval) self.class.enqueue_in interval, *arguments end diff --git a/lib/active_job/execution.rb b/lib/active_job/execution.rb index d21fd32292..78ada3d908 100644 --- a/lib/active_job/execution.rb +++ b/lib/active_job/execution.rb @@ -4,12 +4,13 @@ require 'active_job/arguments' module ActiveJob module Execution extend ActiveSupport::Concern - + included do include ActiveSupport::Rescuable end - def execute(*serialized_args) + def execute(job_id, *serialized_args) + self.job_id = job_id self.arguments = Arguments.deserialize(serialized_args) run_callbacks :perform do diff --git a/lib/active_job/logging.rb b/lib/active_job/logging.rb index ea0ba2dddc..d913aee03d 100644 --- a/lib/active_job/logging.rb +++ b/lib/active_job/logging.rb @@ -26,10 +26,10 @@ module ActiveJob before_enqueue do |job| if job.enqueued_at ActiveSupport::Notifications.instrument "enqueue_at.active_job", - adapter: job.class.queue_adapter, job: job.class, args: job.arguments, timestamp: job.enqueued_at + adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at else ActiveSupport::Notifications.instrument "enqueue.active_job", - adapter: job.class.queue_adapter, job: job.class, args: job.arguments + adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments end end end @@ -50,11 +50,11 @@ module ActiveJob class LogSubscriber < ActiveSupport::LogSubscriber def enqueue(event) - info "Enqueued #{event.payload[:job].name} to #{queue_name(event)}" + args_info(event) + info "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event) end def enqueue_at(event) - info "Enqueued #{event.payload[:job].name} to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) + info "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) end def perform_start(event) @@ -67,7 +67,7 @@ module ActiveJob private def queue_name(event) - event.payload[:adapter].name.demodulize.remove('Adapter') + event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})" end def args_info(event) diff --git a/lib/active_job/queue_adapters/resque_adapter.rb b/lib/active_job/queue_adapters/resque_adapter.rb index f7ed4669bd..84904f84f2 100644 --- a/lib/active_job/queue_adapters/resque_adapter.rb +++ b/lib/active_job/queue_adapters/resque_adapter.rb @@ -8,11 +8,11 @@ module ActiveJob class ResqueAdapter class << self def enqueue(job, *args) - Resque.enqueue JobWrapper.new(job), job, *args + Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args end def enqueue_at(job, timestamp, *args) - Resque.enqueue_at timestamp, JobWrapper.new(job), job, *args + Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args end end @@ -22,14 +22,6 @@ module ActiveJob job_name.constantize.new.execute *args end end - - def initialize(job) - @queue = job.queue_name - end - - def to_s - self.class.name - end end end end diff --git a/lib/activejob.rb b/lib/activejob.rb new file mode 100644 index 0000000000..fea38731af --- /dev/null +++ b/lib/activejob.rb @@ -0,0 +1 @@ +require 'active_job' diff --git a/test/cases/callbacks_test.rb b/test/cases/callbacks_test.rb index 01a9b9d26b..9a0657ee89 100644 --- a/test/cases/callbacks_test.rb +++ b/test/cases/callbacks_test.rb @@ -5,7 +5,7 @@ require 'active_support/core_ext/object/inclusion' class CallbacksTest < ActiveSupport::TestCase test 'perform callbacks' do - performed_callback_job = CallbackJob.new.tap { |j| j.execute } + performed_callback_job = CallbackJob.new.tap { |j| j.execute("A-JOB-ID") } assert "CallbackJob ran before_perform".in? performed_callback_job.history assert "CallbackJob ran after_perform".in? performed_callback_job.history assert "CallbackJob ran around_perform_start".in? performed_callback_job.history diff --git a/test/cases/logging_test.rb b/test/cases/logging_test.rb index 194944db11..537702edd4 100644 --- a/test/cases/logging_test.rb +++ b/test/cases/logging_test.rb @@ -46,7 +46,7 @@ class AdapterTest < ActiveSupport::TestCase def test_enqueue_job_logging HelloJob.enqueue "Cristian" - assert_match(/Enqueued HelloJob to .*?:.*Cristian/, @logger.messages) + assert_match(/Enqueued HelloJob \(Job ID: .*?\) to .*?:.*Cristian/, @logger.messages) end def test_perform_job_logging @@ -69,9 +69,9 @@ class AdapterTest < ActiveSupport::TestCase def test_perform_nested_jobs_logging NestedJob.enqueue assert_match(/\[LoggingJob\] \[.*?\]/, @logger.messages) - assert_match(/\[ActiveJob\] Enqueued NestedJob to/, @logger.messages) + assert_match(/\[ActiveJob\] Enqueued NestedJob \(Job ID: .*\) to/, @logger.messages) assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Performing NestedJob from/, @logger.messages) - assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Enqueued LoggingJob to .* with arguments: "NestedJob"/, @logger.messages) + assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Enqueued LoggingJob \(Job ID: .*?\) to .* with arguments: "NestedJob"/, @logger.messages) assert_match(/\[ActiveJob\].*\[LoggingJob\] \[LOGGING-JOB-ID\] Performing LoggingJob from .* with arguments: "NestedJob"/, @logger.messages) assert_match(/\[ActiveJob\].*\[LoggingJob\] \[LOGGING-JOB-ID\] Dummy, here is it: NestedJob/, @logger.messages) assert_match(/\[ActiveJob\].*\[LoggingJob\] \[LOGGING-JOB-ID\] Performed LoggingJob from .* in/, @logger.messages) @@ -80,14 +80,14 @@ class AdapterTest < ActiveSupport::TestCase def test_enqueue_at_job_logging HelloJob.enqueue_at 1, "Cristian" - assert_match(/Enqueued HelloJob to .*? at.*Cristian/, @logger.messages) + assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages) rescue NotImplementedError skip end def test_enqueue_in_job_logging HelloJob.enqueue_in 2, "Cristian" - assert_match(/Enqueued HelloJob to .*? at.*Cristian/, @logger.messages) + assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages) rescue NotImplementedError skip end diff --git a/test/cases/rescue_test.rb b/test/cases/rescue_test.rb index aea8b036d4..0e995405f7 100644 --- a/test/cases/rescue_test.rb +++ b/test/cases/rescue_test.rb @@ -7,17 +7,15 @@ class RescueTest < ActiveSupport::TestCase setup do $BUFFER = [] end - + test 'rescue perform exception with retry' do - job = RescueJob.new - job.execute("david") + RescueJob.enqueue("david") assert_equal [ "rescued from ArgumentError", "performed beautifully" ], $BUFFER end test 'let through unhandled perform exception' do - job = RescueJob.new assert_raises(RescueJob::OtherError) do - job.execute("other") + RescueJob.enqueue("other") end end end -- cgit v1.2.3 From 68f25a40f528967142d1a37762d96e0020897005 Mon Sep 17 00:00:00 2001 From: Cristian Bica Date: Thu, 12 Jun 2014 14:30:53 +0300 Subject: Fixed failing test --- test/cases/rescue_test.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/cases/rescue_test.rb b/test/cases/rescue_test.rb index 0e995405f7..3d4831bc62 100644 --- a/test/cases/rescue_test.rb +++ b/test/cases/rescue_test.rb @@ -9,13 +9,15 @@ class RescueTest < ActiveSupport::TestCase end test 'rescue perform exception with retry' do - RescueJob.enqueue("david") + job = RescueJob.new + job.execute(SecureRandom.uuid, "david") assert_equal [ "rescued from ArgumentError", "performed beautifully" ], $BUFFER end test 'let through unhandled perform exception' do + job = RescueJob.new assert_raises(RescueJob::OtherError) do - RescueJob.enqueue("other") + job.execute(SecureRandom.uuid, "other") end end end -- cgit v1.2.3