aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorCristian Bica <cristian.bica@gmail.com>2014-05-31 02:19:30 +0300
committerCristian Bica <cristian.bica@gmail.com>2014-06-12 14:01:40 +0300
commit243d74eb30464dc95cb07c0bd14cc086f9cd7022 (patch)
tree34301629fe446bd394b5b0e32cf66f949afe0d53 /lib
parent4ac8dc21440ecdea9c0452a0c12e8bcc487bc776 (diff)
downloadrails-243d74eb30464dc95cb07c0bd14cc086f9cd7022.tar.gz
rails-243d74eb30464dc95cb07c0bd14cc086f9cd7022.tar.bz2
rails-243d74eb30464dc95cb07c0bd14cc086f9cd7022.zip
Persist job_id
Diffstat (limited to 'lib')
-rw-r--r--lib/active_job/enqueuing.rb18
-rw-r--r--lib/active_job/execution.rb5
-rw-r--r--lib/active_job/logging.rb10
-rw-r--r--lib/active_job/queue_adapters/resque_adapter.rb12
-rw-r--r--lib/activejob.rb1
5 files changed, 20 insertions, 26 deletions
diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb
index 0f094fb624..e3ac11ba97 100644
--- a/lib/active_job/enqueuing.rb
+++ b/lib/active_job/enqueuing.rb
@@ -3,19 +3,19 @@ require 'active_job/arguments'
module ActiveJob
module Enqueuing
extend ActiveSupport::Concern
-
+
module ClassMethods
# Push a job onto the queue. The arguments must be legal JSON types
# (string, int, float, nil, true, false, hash or array) or
# ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects
# are not supported.
#
- # Returns an instance of the job class queued with args available in
+ # Returns an instance of the job class queued with args available in
# Job#arguments.
def enqueue(*args)
new(args).tap do |job|
job.run_callbacks :enqueue do
- queue_adapter.enqueue self, *Arguments.serialize(args)
+ queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args)
end
end
end
@@ -24,7 +24,7 @@ module ActiveJob
#
# enqueue_in(1.week, "mike")
#
- # Returns an instance of the job class queued with args available in
+ # Returns an instance of the job class queued with args available in
# Job#arguments and the timestamp in Job#enqueue_at.
def enqueue_in(interval, *args)
enqueue_at interval.seconds.from_now, *args
@@ -34,19 +34,19 @@ module ActiveJob
#
# enqueue_at(Date.tomorrow.midnight, "mike")
#
- # Returns an instance of the job class queued with args available in
+ # Returns an instance of the job class queued with args available in
# Job#arguments and the timestamp in Job#enqueue_at.
def enqueue_at(timestamp, *args)
new(args).tap do |job|
job.enqueued_at = timestamp
job.run_callbacks :enqueue do
- queue_adapter.enqueue_at self, timestamp.to_f, *Arguments.serialize(args)
+ queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args)
end
end
end
end
-
+
included do
attr_accessor :arguments
attr_accessor :enqueued_at
@@ -55,11 +55,11 @@ module ActiveJob
def initialize(arguments = nil)
@arguments = arguments
end
-
+
def retry_now
self.class.enqueue *arguments
end
-
+
def retry_in(interval)
self.class.enqueue_in interval, *arguments
end
diff --git a/lib/active_job/execution.rb b/lib/active_job/execution.rb
index d21fd32292..78ada3d908 100644
--- a/lib/active_job/execution.rb
+++ b/lib/active_job/execution.rb
@@ -4,12 +4,13 @@ require 'active_job/arguments'
module ActiveJob
module Execution
extend ActiveSupport::Concern
-
+
included do
include ActiveSupport::Rescuable
end
- def execute(*serialized_args)
+ def execute(job_id, *serialized_args)
+ self.job_id = job_id
self.arguments = Arguments.deserialize(serialized_args)
run_callbacks :perform do
diff --git a/lib/active_job/logging.rb b/lib/active_job/logging.rb
index ea0ba2dddc..d913aee03d 100644
--- a/lib/active_job/logging.rb
+++ b/lib/active_job/logging.rb
@@ -26,10 +26,10 @@ module ActiveJob
before_enqueue do |job|
if job.enqueued_at
ActiveSupport::Notifications.instrument "enqueue_at.active_job",
- adapter: job.class.queue_adapter, job: job.class, args: job.arguments, timestamp: job.enqueued_at
+ adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at
else
ActiveSupport::Notifications.instrument "enqueue.active_job",
- adapter: job.class.queue_adapter, job: job.class, args: job.arguments
+ adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments
end
end
end
@@ -50,11 +50,11 @@ module ActiveJob
class LogSubscriber < ActiveSupport::LogSubscriber
def enqueue(event)
- info "Enqueued #{event.payload[:job].name} to #{queue_name(event)}" + args_info(event)
+ info "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event)
end
def enqueue_at(event)
- info "Enqueued #{event.payload[:job].name} to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event)
+ info "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event)
end
def perform_start(event)
@@ -67,7 +67,7 @@ module ActiveJob
private
def queue_name(event)
- event.payload[:adapter].name.demodulize.remove('Adapter')
+ event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})"
end
def args_info(event)
diff --git a/lib/active_job/queue_adapters/resque_adapter.rb b/lib/active_job/queue_adapters/resque_adapter.rb
index f7ed4669bd..84904f84f2 100644
--- a/lib/active_job/queue_adapters/resque_adapter.rb
+++ b/lib/active_job/queue_adapters/resque_adapter.rb
@@ -8,11 +8,11 @@ module ActiveJob
class ResqueAdapter
class << self
def enqueue(job, *args)
- Resque.enqueue JobWrapper.new(job), job, *args
+ Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args
end
def enqueue_at(job, timestamp, *args)
- Resque.enqueue_at timestamp, JobWrapper.new(job), job, *args
+ Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args
end
end
@@ -22,14 +22,6 @@ module ActiveJob
job_name.constantize.new.execute *args
end
end
-
- def initialize(job)
- @queue = job.queue_name
- end
-
- def to_s
- self.class.name
- end
end
end
end
diff --git a/lib/activejob.rb b/lib/activejob.rb
new file mode 100644
index 0000000000..fea38731af
--- /dev/null
+++ b/lib/activejob.rb
@@ -0,0 +1 @@
+require 'active_job'