From 243d74eb30464dc95cb07c0bd14cc086f9cd7022 Mon Sep 17 00:00:00 2001 From: Cristian Bica Date: Sat, 31 May 2014 02:19:30 +0300 Subject: Persist job_id --- lib/active_job/enqueuing.rb | 18 +++++++++--------- lib/active_job/execution.rb | 5 +++-- lib/active_job/logging.rb | 10 +++++----- lib/active_job/queue_adapters/resque_adapter.rb | 12 ++---------- 4 files changed, 19 insertions(+), 26 deletions(-) (limited to 'lib/active_job') diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb index 0f094fb624..e3ac11ba97 100644 --- a/lib/active_job/enqueuing.rb +++ b/lib/active_job/enqueuing.rb @@ -3,19 +3,19 @@ require 'active_job/arguments' module ActiveJob module Enqueuing extend ActiveSupport::Concern - + module ClassMethods # Push a job onto the queue. The arguments must be legal JSON types # (string, int, float, nil, true, false, hash or array) or # ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects # are not supported. # - # Returns an instance of the job class queued with args available in + # Returns an instance of the job class queued with args available in # Job#arguments. def enqueue(*args) new(args).tap do |job| job.run_callbacks :enqueue do - queue_adapter.enqueue self, *Arguments.serialize(args) + queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args) end end end @@ -24,7 +24,7 @@ module ActiveJob # # enqueue_in(1.week, "mike") # - # Returns an instance of the job class queued with args available in + # Returns an instance of the job class queued with args available in # Job#arguments and the timestamp in Job#enqueue_at. def enqueue_in(interval, *args) enqueue_at interval.seconds.from_now, *args @@ -34,19 +34,19 @@ module ActiveJob # # enqueue_at(Date.tomorrow.midnight, "mike") # - # Returns an instance of the job class queued with args available in + # Returns an instance of the job class queued with args available in # Job#arguments and the timestamp in Job#enqueue_at. def enqueue_at(timestamp, *args) new(args).tap do |job| job.enqueued_at = timestamp job.run_callbacks :enqueue do - queue_adapter.enqueue_at self, timestamp.to_f, *Arguments.serialize(args) + queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args) end end end end - + included do attr_accessor :arguments attr_accessor :enqueued_at @@ -55,11 +55,11 @@ module ActiveJob def initialize(arguments = nil) @arguments = arguments end - + def retry_now self.class.enqueue *arguments end - + def retry_in(interval) self.class.enqueue_in interval, *arguments end diff --git a/lib/active_job/execution.rb b/lib/active_job/execution.rb index d21fd32292..78ada3d908 100644 --- a/lib/active_job/execution.rb +++ b/lib/active_job/execution.rb @@ -4,12 +4,13 @@ require 'active_job/arguments' module ActiveJob module Execution extend ActiveSupport::Concern - + included do include ActiveSupport::Rescuable end - def execute(*serialized_args) + def execute(job_id, *serialized_args) + self.job_id = job_id self.arguments = Arguments.deserialize(serialized_args) run_callbacks :perform do diff --git a/lib/active_job/logging.rb b/lib/active_job/logging.rb index ea0ba2dddc..d913aee03d 100644 --- a/lib/active_job/logging.rb +++ b/lib/active_job/logging.rb @@ -26,10 +26,10 @@ module ActiveJob before_enqueue do |job| if job.enqueued_at ActiveSupport::Notifications.instrument "enqueue_at.active_job", - adapter: job.class.queue_adapter, job: job.class, args: job.arguments, timestamp: job.enqueued_at + adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at else ActiveSupport::Notifications.instrument "enqueue.active_job", - adapter: job.class.queue_adapter, job: job.class, args: job.arguments + adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments end end end @@ -50,11 +50,11 @@ module ActiveJob class LogSubscriber < ActiveSupport::LogSubscriber def enqueue(event) - info "Enqueued #{event.payload[:job].name} to #{queue_name(event)}" + args_info(event) + info "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event) end def enqueue_at(event) - info "Enqueued #{event.payload[:job].name} to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) + info "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) end def perform_start(event) @@ -67,7 +67,7 @@ module ActiveJob private def queue_name(event) - event.payload[:adapter].name.demodulize.remove('Adapter') + event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})" end def args_info(event) diff --git a/lib/active_job/queue_adapters/resque_adapter.rb b/lib/active_job/queue_adapters/resque_adapter.rb index f7ed4669bd..84904f84f2 100644 --- a/lib/active_job/queue_adapters/resque_adapter.rb +++ b/lib/active_job/queue_adapters/resque_adapter.rb @@ -8,11 +8,11 @@ module ActiveJob class ResqueAdapter class << self def enqueue(job, *args) - Resque.enqueue JobWrapper.new(job), job, *args + Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args end def enqueue_at(job, timestamp, *args) - Resque.enqueue_at timestamp, JobWrapper.new(job), job, *args + Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args end end @@ -22,14 +22,6 @@ module ActiveJob job_name.constantize.new.execute *args end end - - def initialize(job) - @queue = job.queue_name - end - - def to_s - self.class.name - end end end end -- cgit v1.2.3