aboutsummaryrefslogtreecommitdiffstats
path: root/activejob
diff options
context:
space:
mode:
Diffstat (limited to 'activejob')
-rw-r--r--activejob/.gitignore1
-rw-r--r--activejob/CHANGELOG.md28
-rw-r--r--activejob/MIT-LICENSE2
-rw-r--r--activejob/README.md32
-rw-r--r--activejob/Rakefile89
-rw-r--r--activejob/activejob.gemspec4
-rw-r--r--activejob/lib/active_job.rb5
-rw-r--r--activejob/lib/active_job/arguments.rb64
-rw-r--r--activejob/lib/active_job/base.rb54
-rw-r--r--activejob/lib/active_job/callbacks.rb18
-rw-r--r--activejob/lib/active_job/configured_job.rb16
-rw-r--r--activejob/lib/active_job/core.rb112
-rw-r--r--activejob/lib/active_job/enqueuing.rb106
-rw-r--r--activejob/lib/active_job/execution.rb33
-rw-r--r--activejob/lib/active_job/gem_version.rb6
-rw-r--r--activejob/lib/active_job/identifier.rb15
-rw-r--r--activejob/lib/active_job/logging.rb45
-rw-r--r--activejob/lib/active_job/queue_adapter.rb42
-rw-r--r--activejob/lib/active_job/queue_adapters.rb36
-rw-r--r--activejob/lib/active_job/queue_adapters/backburner_adapter.rb26
-rw-r--r--activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb30
-rw-r--r--activejob/lib/active_job/queue_adapters/inline_adapter.rb16
-rw-r--r--activejob/lib/active_job/queue_adapters/qu_adapter.rb28
-rw-r--r--activejob/lib/active_job/queue_adapters/que_adapter.rb26
-rw-r--r--activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb43
-rw-r--r--activejob/lib/active_job/queue_adapters/resque_adapter.rb25
-rw-r--r--activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb28
-rw-r--r--activejob/lib/active_job/queue_adapters/sneakers_adapter.rb26
-rw-r--r--activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb27
-rw-r--r--activejob/lib/active_job/queue_adapters/test_adapter.rb51
-rw-r--r--activejob/lib/active_job/queue_name.rb38
-rw-r--r--activejob/lib/active_job/test_case.rb7
-rw-r--r--activejob/lib/active_job/test_helper.rb220
-rw-r--r--activejob/test/adapters/test.rb3
-rw-r--r--activejob/test/cases/adapter_test.rb3
-rw-r--r--activejob/test/cases/argument_serialization_test.rb84
-rw-r--r--activejob/test/cases/callbacks_test.rb5
-rw-r--r--activejob/test/cases/job_serialization_test.rb2
-rw-r--r--activejob/test/cases/logging_test.rb52
-rw-r--r--activejob/test/cases/parameters_test.rb77
-rw-r--r--activejob/test/cases/queue_naming_test.rb82
-rw-r--r--activejob/test/cases/queuing_test.rb14
-rw-r--r--activejob/test/cases/rescue_test.rb17
-rw-r--r--activejob/test/cases/test_case_test.rb14
-rw-r--r--activejob/test/cases/test_helper_test.rb226
-rw-r--r--activejob/test/helper.rb19
-rw-r--r--activejob/test/integration/queuing_test.rb47
-rw-r--r--activejob/test/jobs/callback_job.rb29
-rw-r--r--activejob/test/jobs/nested_job.rb2
-rw-r--r--activejob/test/jobs/rescue_job.rb2
-rw-r--r--activejob/test/support/delayed_job/delayed/backend/test.rb4
-rw-r--r--activejob/test/support/integration/adapters/backburner.rb38
-rw-r--r--activejob/test/support/integration/adapters/delayed_job.rb20
-rw-r--r--activejob/test/support/integration/adapters/inline.rb15
-rw-r--r--activejob/test/support/integration/adapters/qu.rb38
-rw-r--r--activejob/test/support/integration/adapters/que.rb37
-rw-r--r--activejob/test/support/integration/adapters/queue_classic.rb33
-rw-r--r--activejob/test/support/integration/adapters/resque.rb49
-rw-r--r--activejob/test/support/integration/adapters/sidekiq.rb58
-rw-r--r--activejob/test/support/integration/adapters/sneakers.rb90
-rw-r--r--activejob/test/support/integration/adapters/sucker_punch.rb6
-rw-r--r--activejob/test/support/integration/dummy_app_template.rb21
-rw-r--r--activejob/test/support/integration/helper.rb30
-rw-r--r--activejob/test/support/integration/jobs_manager.rb27
-rw-r--r--activejob/test/support/integration/test_case_helpers.rb48
-rw-r--r--activejob/test/support/queue_classic/inline.rb12
66 files changed, 2074 insertions, 429 deletions
diff --git a/activejob/.gitignore b/activejob/.gitignore
new file mode 100644
index 0000000000..b3aaf55871
--- /dev/null
+++ b/activejob/.gitignore
@@ -0,0 +1 @@
+test/dummy
diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md
index b04883413e..afdd42be33 100644
--- a/activejob/CHANGELOG.md
+++ b/activejob/CHANGELOG.md
@@ -1 +1,27 @@
-* Started project. \ No newline at end of file
+* `ActiveJob::Base.deserialize` delegates to the job class
+
+
+ Since `ActiveJob::Base#deserialize` can be overridden by subclasses (like
+ `ActiveJob::Base#serialize`) this allows jobs to attach arbitrary metadata
+ when they get serialized and read it back when they get performed. Example:
+
+ class DeliverWebhookJob < ActiveJob::Base
+ def serialize
+ super.merge('attempt_number' => (@attempt_number || 0) + 1)
+ end
+
+ def deserialize(job_data)
+ super
+ @attempt_number = job_data['attempt_number']
+ end
+
+ rescue_from(TimeoutError) do |exception|
+ raise exception if @attempt_number > 5
+ retry_job(wait: 10)
+ end
+ end
+
+ *Isaac Seymour*
+
+
+Please check [4-2-stable](https://github.com/rails/rails/blob/4-2-stable/activejob/CHANGELOG.md) for previous changes.
diff --git a/activejob/MIT-LICENSE b/activejob/MIT-LICENSE
index 8b1e97b776..0cef8cdda0 100644
--- a/activejob/MIT-LICENSE
+++ b/activejob/MIT-LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2014 David Heinemeier Hansson
+Copyright (c) 2014-2015 David Heinemeier Hansson
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
diff --git a/activejob/README.md b/activejob/README.md
index 1f300fcf62..8c83d3669a 100644
--- a/activejob/README.md
+++ b/activejob/README.md
@@ -5,7 +5,7 @@ of queueing backends. These jobs can be everything from regularly scheduled
clean-ups, to billing charges, to mailings. Anything that can be chopped up into
small units of work and run in parallel, really.
-It also serves as the backend for ActionMailer's #deliver_later functionality
+It also serves as the backend for Action Mailer's #deliver_later functionality
that makes it easy to turn any mailing into a job for running later. That's
one of the most common jobs in a modern web application: Sending emails outside
of the request-response cycle, so the user doesn't have to wait on it.
@@ -24,9 +24,10 @@ Set the queue adapter for Active Job:
``` ruby
ActiveJob::Base.queue_adapter = :inline # default queue adapter
-# Adapters currently supported: :backburner, :delayed_job, :qu, :que, :queue_classic,
-# :resque, :sidekiq, :sneakers, :sucker_punch
```
+Note: To learn how to use your preferred queueing backend see its adapter
+documentation at
+[ActiveJob::QueueAdapters](http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html).
Declare a job like so:
@@ -43,15 +44,15 @@ end
Enqueue a job like so:
```ruby
-MyJob.enqueue record # Enqueue a job to be performed as soon the queueing system is free.
+MyJob.perform_later record # Enqueue a job to be performed as soon the queueing system is free.
```
```ruby
-MyJob.enqueue_at Date.tomorrow.noon, record # Enqueue a job to be performed tomorrow at noon.
+MyJob.set(wait_until: Date.tomorrow.noon).perform_later(record) # Enqueue a job to be performed tomorrow at noon.
```
```ruby
-MyJob.enqueue_in 1.week, record # Enqueue a job to be performed 1 week from now.
+MyJob.set(wait: 1.week).perform_later(record) # Enqueue a job to be performed 1 week from now.
```
That's it!
@@ -88,18 +89,9 @@ by default has been mixed into Active Record classes.
## Supported queueing systems
-We currently have adapters for:
-
-* [Backburner](https://github.com/nesquena/backburner)
-* [Delayed Job](https://github.com/collectiveidea/delayed_job)
-* [Qu](https://github.com/bkeepers/qu)
-* [Que](https://github.com/chanks/que)
-* [QueueClassic](https://github.com/ryandotsmith/queue_classic)
-* [Resque 1.x](https://github.com/resque/resque)
-* [Sidekiq](https://github.com/mperham/sidekiq)
-* [Sneakers](https://github.com/jondot/sneakers)
-* [Sucker Punch](https://github.com/brandonhilkert/sucker_punch)
-
+Active Job has built-in adapters for multiple queueing backends (Sidekiq,
+Resque, Delayed Job and others). To get an up-to-date list of the adapters
+see the API Documentation for [ActiveJob::QueueAdapters](http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html).
## Auxiliary gems
@@ -119,7 +111,7 @@ Source code can be downloaded as part of the Rails project on GitHub
## License
-ActiveJob is released under the MIT license:
+Active Job is released under the MIT license:
* http://www.opensource.org/licenses/MIT
@@ -137,5 +129,3 @@ Bug reports can be filed for the Ruby on Rails project here:
Feature requests should be discussed on the rails-core mailing list here:
* https://groups.google.com/forum/?fromgroups#!forum/rubyonrails-core
-
-
diff --git a/activejob/Rakefile b/activejob/Rakefile
index 484cd1d0b8..1922f256ec 100644
--- a/activejob/Rakefile
+++ b/activejob/Rakefile
@@ -1,73 +1,78 @@
require 'rake/testtask'
require 'rubygems/package_task'
-dir = File.dirname(__FILE__)
-
-def run_without_aborting(*tasks)
- errors = []
-
- tasks.each do |task|
- begin
- Rake::Task[task].invoke
- rescue Exception
- errors << task
- end
- end
-
- abort "Errors running #{errors.join(', ')}" if errors.any?
-end
-
-task default: :test
-
-ACTIVEJOB_ADAPTERS = %w(inline delayed_job qu que queue_classic resque sidekiq sneakers sucker_punch backburner)
+ACTIVEJOB_ADAPTERS = %w(inline delayed_job qu que queue_classic resque sidekiq sneakers sucker_punch backburner test)
ACTIVEJOB_ADAPTERS -= %w(queue_classic) if defined?(JRUBY_VERSION)
-desc 'Run all adapter tests'
-task :test do
- tasks = ACTIVEJOB_ADAPTERS.map{|a| "test_#{a}" }
- run_without_aborting(*tasks)
-end
+task default: :test
+task test: 'test:default'
namespace :test do
+ desc 'Run all adapter tests'
+ task :default do
+ run_without_aborting ACTIVEJOB_ADAPTERS.map { |a| "test:#{a}" }
+ end
+
desc 'Run all adapter tests in isolation'
task :isolated do
- tasks = ACTIVEJOB_ADAPTERS.map{|a| "isolated_test_#{a}" }
- run_without_aborting(*tasks)
+ run_without_aborting ACTIVEJOB_ADAPTERS.map { |a| "test:isolated:#{a}" }
+ end
+
+ desc 'Run integration tests for all adapters'
+ task :integration do
+ run_without_aborting (ACTIVEJOB_ADAPTERS - ['test']).map { |a| "test:integration:#{a}" }
+ end
+
+ task 'env:integration' do
+ ENV['AJ_INTEGRATION_TESTS'] = "1"
end
-end
+ ACTIVEJOB_ADAPTERS.each do |adapter|
+ task("env:#{adapter}") { ENV['AJADAPTER'] = adapter }
-ACTIVEJOB_ADAPTERS.each do |adapter|
- namespace :test do
- Rake::TestTask.new(adapter => "#{adapter}:env") do |t|
- t.description = ""
+ Rake::TestTask.new(adapter => "test:env:#{adapter}") do |t|
+ t.description = "Run adapter tests for #{adapter}"
t.libs << 'test'
t.test_files = FileList['test/cases/**/*_test.rb']
t.verbose = true
+ t.warning = true
+ t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION)
end
namespace :isolated do
- task adapter => "#{adapter}:env" do
+ task adapter => "test:env:#{adapter}" do
+ dir = File.dirname(__FILE__)
Dir.glob("#{dir}/test/cases/**/*_test.rb").all? do |file|
sh(Gem.ruby, '-w', "-I#{dir}/lib", "-I#{dir}/test", file)
end or raise 'Failures'
end
end
- end
-
- namespace adapter do
- task test: "test_#{adapter}"
- task isolated_test: "isolated_test_#{adapter}"
- task(:env) { ENV['AJADAPTER'] = adapter }
+ namespace :integration do
+ Rake::TestTask.new(adapter => ["test:env:#{adapter}", 'test:env:integration']) do |t|
+ t.description = "Run integration tests for #{adapter}"
+ t.libs << 'test'
+ t.test_files = FileList['test/integration/**/*_test.rb']
+ t.verbose = true
+ t.warning = true
+ t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION)
+ end
+ end
end
+end
+def run_without_aborting(tasks)
+ errors = []
- desc "Run #{adapter} tests"
- task "test_#{adapter}" => ["#{adapter}:env", "test:#{adapter}"]
+ tasks.each do |task|
+ begin
+ Rake::Task[task].invoke
+ rescue Exception
+ errors << task
+ end
+ end
- desc "Run #{adapter} tests in isolation"
- task "isolated_test_#{adapter}" => ["#{adapter}:env", "test:isolated:#{adapter}"]
+ abort "Errors running #{errors.join(', ')}" if errors.any?
end
diff --git a/activejob/activejob.gemspec b/activejob/activejob.gemspec
index 9944b2a1bd..5404ece804 100644
--- a/activejob/activejob.gemspec
+++ b/activejob/activejob.gemspec
@@ -7,7 +7,7 @@ Gem::Specification.new do |s|
s.summary = 'Job framework with pluggable queues.'
s.description = 'Declare job classes that can be run by a variety of queueing backends.'
- s.required_ruby_version = '>= 1.9.3'
+ s.required_ruby_version = '>= 2.2.0'
s.license = 'MIT'
@@ -19,5 +19,5 @@ Gem::Specification.new do |s|
s.require_path = 'lib'
s.add_dependency 'activesupport', version
- s.add_dependency 'globalid', '>= 0.2.3'
+ s.add_dependency 'globalid', '>= 0.3.0'
end
diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb
index 29123170b8..3d4f63b261 100644
--- a/activejob/lib/active_job.rb
+++ b/activejob/lib/active_job.rb
@@ -1,5 +1,5 @@
#--
-# Copyright (c) 2014 David Heinemeier Hansson
+# Copyright (c) 2014-2015 David Heinemeier Hansson
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
@@ -31,4 +31,7 @@ module ActiveJob
autoload :Base
autoload :QueueAdapters
+ autoload :ConfiguredJob
+ autoload :TestCase
+ autoload :TestHelper
end
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb
index e54f4afbc7..752be6898e 100644
--- a/activejob/lib/active_job/arguments.rb
+++ b/activejob/lib/active_job/arguments.rb
@@ -1,3 +1,5 @@
+require 'active_support/core_ext/hash/indifferent_access'
+
module ActiveJob
# Raised when an exception is raised during job arguments deserialization.
#
@@ -5,8 +7,8 @@ module ActiveJob
class DeserializationError < StandardError
attr_reader :original_exception
- def initialize(e)
- super ("Error while trying to deserialize arguments: #{e.message}")
+ def initialize(e) #:nodoc:
+ super("Error while trying to deserialize arguments: #{e.message}")
@original_exception = e
set_backtrace e.backtrace
end
@@ -24,25 +26,38 @@ module ActiveJob
extend self
TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ]
+ # Serializes a set of arguments. Whitelisted types are returned
+ # as-is. Arrays/Hashes are serialized element by element.
+ # All other types are serialized using GlobalID.
def serialize(arguments)
arguments.map { |argument| serialize_argument(argument) }
end
+ # Deserializes a set of arguments. Whitelisted types are returned
+ # as-is. Arrays/Hashes are deserialized element by element.
+ # All other types are deserialized using GlobalID.
def deserialize(arguments)
arguments.map { |argument| deserialize_argument(argument) }
+ rescue => e
+ raise DeserializationError.new(e)
end
private
+ GLOBALID_KEY = '_aj_globalid'.freeze
+ private_constant :GLOBALID_KEY
+
def serialize_argument(argument)
case argument
- when GlobalID::Identification
- argument.global_id.to_s
when *TYPE_WHITELIST
argument
+ when GlobalID::Identification
+ { GLOBALID_KEY => argument.to_global_id.to_s }
when Array
- serialize(argument)
+ argument.map { |arg| serialize_argument(arg) }
when Hash
- Hash[ argument.map { |key, value| [ serialize_hash_key(key), serialize_argument(value) ] } ]
+ argument.each_with_object({}) do |(key, value), hash|
+ hash[serialize_hash_key(key)] = serialize_argument(value)
+ end
else
raise SerializationError.new("Unsupported argument type: #{argument.class.name}")
end
@@ -50,23 +65,48 @@ module ActiveJob
def deserialize_argument(argument)
case argument
+ when String
+ GlobalID::Locator.locate(argument) || argument
+ when *TYPE_WHITELIST
+ argument
when Array
- deserialize(argument)
+ argument.map { |arg| deserialize_argument(arg) }
when Hash
- Hash[ argument.map { |key, value| [ key, deserialize_argument(value) ] } ].with_indifferent_access
+ if serialized_global_id?(argument)
+ deserialize_global_id argument
+ else
+ deserialize_hash argument
+ end
else
- GlobalID::Locator.locate(argument) || argument
+ raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
end
- rescue => e
- raise DeserializationError.new(e)
end
+ def serialized_global_id?(hash)
+ hash.size == 1 and hash.include?(GLOBALID_KEY)
+ end
+
+ def deserialize_global_id(hash)
+ GlobalID::Locator.locate hash[GLOBALID_KEY]
+ end
+
+ def deserialize_hash(serialized_hash)
+ serialized_hash.each_with_object({}.with_indifferent_access) do |(key, value), hash|
+ hash[key] = deserialize_argument(value)
+ end
+ end
+
+ RESERVED_KEYS = [GLOBALID_KEY, GLOBALID_KEY.to_sym]
+ private_constant :RESERVED_KEYS
+
def serialize_hash_key(key)
case key
+ when *RESERVED_KEYS
+ raise SerializationError.new("Can't serialize a Hash with reserved key #{key.inspect}")
when String, Symbol
key.to_s
else
- raise SerializationError.new("Unsupported hash key type: #{key.class.name}")
+ raise SerializationError.new("Only string and symbol hash keys may be serialized as job arguments, but #{key.inspect} is a #{key.class}")
end
end
end
diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb
index d5ba253016..fd49b3fda5 100644
--- a/activejob/lib/active_job/base.rb
+++ b/activejob/lib/active_job/base.rb
@@ -1,20 +1,64 @@
+require 'active_job/core'
require 'active_job/queue_adapter'
require 'active_job/queue_name'
require 'active_job/enqueuing'
require 'active_job/execution'
require 'active_job/callbacks'
-require 'active_job/identifier'
require 'active_job/logging'
-module ActiveJob
+module ActiveJob #:nodoc:
+ # = Active Job
+ #
+ # Active Job objects can be configured to work with different backend
+ # queuing frameworks. To specify a queue adapter to use:
+ #
+ # ActiveJob::Base.queue_adapter = :inline
+ #
+ # A list of supported adapters can be found in QueueAdapters.
+ #
+ # Active Job objects can be defined by creating a class that inherits
+ # from the ActiveJob::Base class. The only necessary method to
+ # implement is the "perform" method.
+ #
+ # To define an Active Job object:
+ #
+ # class ProcessPhotoJob < ActiveJob::Base
+ # def perform(photo)
+ # photo.watermark!('Rails')
+ # photo.rotate!(90.degrees)
+ # photo.resize_to_fit!(300, 300)
+ # photo.upload!
+ # end
+ # end
+ #
+ # Records that are passed in are serialized/deserialized using Global
+ # ID. More information can be found in Arguments.
+ #
+ # To enqueue a job to be performed as soon the queueing system is free:
+ #
+ # ProcessPhotoJob.perform_later(photo)
+ #
+ # To enqueue a job to be processed at some point in the future:
+ #
+ # ProcessPhotoJob.set(wait_until: Date.tomorrow.noon).perform_later(photo)
+ #
+ # More information can be found in ActiveJob::Core::ClassMethods#set
+ #
+ # A job can also be processed immediately without sending to the queue:
+ #
+ # ProcessPhotoJob.perform_now(photo)
+ #
+ # == Exceptions
+ #
+ # * DeserializationError - Error class for deserialization errors.
+ # * SerializationError - Error class for serialization errors.
class Base
- extend QueueAdapter
-
+ include Core
+ include QueueAdapter
include QueueName
include Enqueuing
include Execution
include Callbacks
- include Identifier
include Logging
ActiveSupport.run_load_hooks(:active_job, self)
diff --git a/activejob/lib/active_job/callbacks.rb b/activejob/lib/active_job/callbacks.rb
index cafa3438c0..2b6149e84e 100644
--- a/activejob/lib/active_job/callbacks.rb
+++ b/activejob/lib/active_job/callbacks.rb
@@ -3,8 +3,8 @@ require 'active_support/callbacks'
module ActiveJob
# = Active Job Callbacks
#
- # Active Job provides hooks during the lifecycle of a job. Callbacks allow you
- # to trigger logic during the lifecycle of a job. Available callbacks are:
+ # Active Job provides hooks during the life cycle of a job. Callbacks allow you
+ # to trigger logic during the life cycle of a job. Available callbacks are:
#
# * <tt>before_enqueue</tt>
# * <tt>around_enqueue</tt>
@@ -22,6 +22,8 @@ module ActiveJob
define_callbacks :enqueue
end
+ # These methods will be included into any Active Job object, adding
+ # callbacks for +perform+ and +enqueue+ methods.
module ClassMethods
# Defines a callback that will get called right before the
# job's perform method is executed.
@@ -36,7 +38,7 @@ module ActiveJob
# def perform(video_id)
# Video.find(video_id).process
# end
- # end
+ # end
#
def before_perform(*filters, &blk)
set_callback(:perform, :before, *filters, &blk)
@@ -55,7 +57,7 @@ module ActiveJob
# def perform(video_id)
# Video.find(video_id).process
# end
- # end
+ # end
#
def after_perform(*filters, &blk)
set_callback(:perform, :after, *filters, &blk)
@@ -75,7 +77,7 @@ module ActiveJob
# def perform(video_id)
# Video.find(video_id).process
# end
- # end
+ # end
#
def around_perform(*filters, &blk)
set_callback(:perform, :around, *filters, &blk)
@@ -94,7 +96,7 @@ module ActiveJob
# def perform(video_id)
# Video.find(video_id).process
# end
- # end
+ # end
#
def before_enqueue(*filters, &blk)
set_callback(:enqueue, :before, *filters, &blk)
@@ -113,7 +115,7 @@ module ActiveJob
# def perform(video_id)
# Video.find(video_id).process
# end
- # end
+ # end
#
def after_enqueue(*filters, &blk)
set_callback(:enqueue, :after, *filters, &blk)
@@ -134,7 +136,7 @@ module ActiveJob
# def perform(video_id)
# Video.find(video_id).process
# end
- # end
+ # end
#
def around_enqueue(*filters, &blk)
set_callback(:enqueue, :around, *filters, &blk)
diff --git a/activejob/lib/active_job/configured_job.rb b/activejob/lib/active_job/configured_job.rb
new file mode 100644
index 0000000000..979280b910
--- /dev/null
+++ b/activejob/lib/active_job/configured_job.rb
@@ -0,0 +1,16 @@
+module ActiveJob
+ class ConfiguredJob #:nodoc:
+ def initialize(job_class, options={})
+ @options = options
+ @job_class = job_class
+ end
+
+ def perform_now(*args)
+ @job_class.new(*args).perform_now
+ end
+
+ def perform_later(*args)
+ @job_class.new(*args).enqueue @options
+ end
+ end
+end
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb
new file mode 100644
index 0000000000..ddd7d1361c
--- /dev/null
+++ b/activejob/lib/active_job/core.rb
@@ -0,0 +1,112 @@
+module ActiveJob
+ module Core
+ extend ActiveSupport::Concern
+
+ included do
+ # Job arguments
+ attr_accessor :arguments
+ attr_writer :serialized_arguments
+
+ # Timestamp when the job should be performed
+ attr_accessor :scheduled_at
+
+ # Job Identifier
+ attr_accessor :job_id
+
+ # Queue in which the job will reside.
+ attr_writer :queue_name
+ end
+
+ # These methods will be included into any Active Job object, adding
+ # helpers for de/serialization and creation of job instances.
+ module ClassMethods
+ # Creates a new job instance from a hash created with +serialize+
+ def deserialize(job_data)
+ job = job_data['job_class'].constantize.new
+ job.deserialize(job_data)
+ job
+ end
+
+ # Creates a job preconfigured with the given options. You can call
+ # perform_later with the job arguments to enqueue the job with the
+ # preconfigured options
+ #
+ # ==== Options
+ # * <tt>:wait</tt> - Enqueues the job with the specified delay
+ # * <tt>:wait_until</tt> - Enqueues the job at the time specified
+ # * <tt>:queue</tt> - Enqueues the job on the specified queue
+ #
+ # ==== Examples
+ #
+ # VideoJob.set(queue: :some_queue).perform_later(Video.last)
+ # VideoJob.set(wait: 5.minutes).perform_later(Video.last)
+ # VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last)
+ # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
+ # VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
+ def set(options={})
+ ConfiguredJob.new(self, options)
+ end
+ end
+
+ # Creates a new job instance. Takes the arguments that will be
+ # passed to the perform method.
+ def initialize(*arguments)
+ @arguments = arguments
+ @job_id = SecureRandom.uuid
+ @queue_name = self.class.queue_name
+ end
+
+ # Returns a hash with the job data that can safely be passed to the
+ # queueing adapter.
+ def serialize
+ {
+ 'job_class' => self.class.name,
+ 'job_id' => job_id,
+ 'queue_name' => queue_name,
+ 'arguments' => serialize_arguments(arguments)
+ }
+ end
+
+ # Attaches the stored job data to the current instance. Receives a hash
+ # returned from +serialize+
+ #
+ # ==== Examples
+ #
+ # class DeliverWebhookJob < ActiveJob::Base
+ # def serialize
+ # super.merge('attempt_number' => (@attempt_number || 0) + 1)
+ # end
+ #
+ # def deserialize(job_data)
+ # super
+ # @attempt_number = job_data['attempt_number']
+ # end
+ #
+ # rescue_from(TimeoutError) do |exception|
+ # raise exception if @attempt_number > 5
+ # retry_job(wait: 10)
+ # end
+ # end
+ def deserialize(job_data)
+ self.job_id = job_data['job_id']
+ self.queue_name = job_data['queue_name']
+ self.serialized_arguments = job_data['arguments']
+ end
+
+ private
+ def deserialize_arguments_if_needed
+ if defined?(@serialized_arguments) && @serialized_arguments.present?
+ @arguments = deserialize_arguments(@serialized_arguments)
+ @serialized_arguments = nil
+ end
+ end
+
+ def serialize_arguments(serialized_args)
+ Arguments.serialize(serialized_args)
+ end
+
+ def deserialize_arguments(serialized_args)
+ Arguments.deserialize(serialized_args)
+ end
+ end
+end
diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb
index 3d00d51867..430c17e1bf 100644
--- a/activejob/lib/active_job/enqueuing.rb
+++ b/activejob/lib/active_job/enqueuing.rb
@@ -4,68 +4,74 @@ module ActiveJob
module Enqueuing
extend ActiveSupport::Concern
+ # Includes the +perform_later+ method for job initialization.
module ClassMethods
- # Push a job onto the queue. The arguments must be legal JSON types
+ # Push a job onto the queue. The arguments must be legal JSON types
# (string, int, float, nil, true, false, hash or array) or
- # GlobalID::Identification instances. Arbitrary Ruby objects
+ # GlobalID::Identification instances. Arbitrary Ruby objects
# are not supported.
#
- # Returns an instance of the job class queued with args available in
+ # Returns an instance of the job class queued with arguments available in
# Job#arguments.
- def enqueue(*args)
- new(args).tap do |job|
- job.run_callbacks :enqueue do
- queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args)
- end
- end
- end
-
- # Enqueue a job to be performed at +interval+ from now.
- #
- # enqueue_in(1.week, "mike")
- #
- # Returns an instance of the job class queued with args available in
- # Job#arguments and the timestamp in Job#enqueue_at.
- def enqueue_in(interval, *args)
- enqueue_at interval.seconds.from_now, *args
+ def perform_later(*args)
+ job_or_instantiate(*args).enqueue
end
- # Enqueue a job to be performed at an explicit point in time.
- #
- # enqueue_at(Date.tomorrow.midnight, "mike")
- #
- # Returns an instance of the job class queued with args available in
- # Job#arguments and the timestamp in Job#enqueue_at.
- def enqueue_at(timestamp, *args)
- new(args).tap do |job|
- job.enqueued_at = timestamp
-
- job.run_callbacks :enqueue do
- queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args)
- end
+ protected
+ def job_or_instantiate(*args)
+ args.first.is_a?(self) ? args.first : new(*args)
end
- end
end
- included do
- attr_accessor :arguments
- attr_accessor :enqueued_at
+ # Reschedules the job to be re-executed. This is useful in combination
+ # with the +rescue_from+ option. When you rescue an exception from your job
+ # you can ask Active Job to retry performing your job.
+ #
+ # ==== Options
+ # * <tt>:wait</tt> - Enqueues the job with the specified delay
+ # * <tt>:wait_until</tt> - Enqueues the job at the time specified
+ # * <tt>:queue</tt> - Enqueues the job on the specified queue
+ #
+ # ==== Examples
+ #
+ # class SiteScrapperJob < ActiveJob::Base
+ # rescue_from(ErrorLoadingSite) do
+ # retry_job queue: :low_priority
+ # end
+ #
+ # def perform(*args)
+ # # raise ErrorLoadingSite if cannot scrape
+ # end
+ # end
+ def retry_job(options={})
+ enqueue options
end
- def initialize(arguments = nil)
- @arguments = arguments
- end
-
- def retry_now
- self.class.enqueue(*arguments)
- end
-
- def retry_in(interval)
- self.class.enqueue_in interval, *arguments
- end
-
- def retry_at(timestamp)
- self.class.enqueue_at timestamp, *arguments
+ # Enqueues the job to be performed by the queue adapter.
+ #
+ # ==== Options
+ # * <tt>:wait</tt> - Enqueues the job with the specified delay
+ # * <tt>:wait_until</tt> - Enqueues the job at the time specified
+ # * <tt>:queue</tt> - Enqueues the job on the specified queue
+ #
+ # ==== Examples
+ #
+ # my_job_instance.enqueue
+ # my_job_instance.enqueue wait: 5.minutes
+ # my_job_instance.enqueue queue: :important
+ # my_job_instance.enqueue wait_until: Date.tomorrow.midnight
+ def enqueue(options={})
+ self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
+ self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
+ self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
+ run_callbacks :enqueue do
+ if self.scheduled_at
+ self.class.queue_adapter.enqueue_at self, self.scheduled_at
+ else
+ self.class.queue_adapter.enqueue self
+ end
+ end
+ self
end
end
end
diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb
index 0e7b5bdd72..79d232da4a 100644
--- a/activejob/lib/active_job/execution.rb
+++ b/activejob/lib/active_job/execution.rb
@@ -4,15 +4,30 @@ require 'active_job/arguments'
module ActiveJob
module Execution
extend ActiveSupport::Concern
+ include ActiveSupport::Rescuable
- included do
- include ActiveSupport::Rescuable
- end
+ # Includes methods for executing and performing jobs instantly.
+ module ClassMethods
+ # Performs the job immediately.
+ #
+ # MyJob.perform_now("mike")
+ #
+ def perform_now(*args)
+ job_or_instantiate(*args).perform_now
+ end
- def execute(job_id, *serialized_args)
- self.job_id = job_id
- self.arguments = deserialize_arguments(serialized_args)
+ def execute(job_data) #:nodoc:
+ job = deserialize(job_data)
+ job.perform_now
+ end
+ end
+ # Performs the job immediately. The job is not sent to the queueing adapter
+ # but directly executed by blocking the execution of others until it's finished.
+ #
+ # MyJob.new(*args).perform_now
+ def perform_now
+ deserialize_arguments_if_needed
run_callbacks :perform do
perform(*arguments)
end
@@ -23,11 +38,5 @@ module ActiveJob
def perform(*)
fail NotImplementedError
end
-
- private
- def deserialize_arguments(serialized_args)
- Arguments.deserialize(serialized_args)
- end
-
end
end
diff --git a/activejob/lib/active_job/gem_version.rb b/activejob/lib/active_job/gem_version.rb
index c166020b28..27a5de93f4 100644
--- a/activejob/lib/active_job/gem_version.rb
+++ b/activejob/lib/active_job/gem_version.rb
@@ -1,12 +1,12 @@
module ActiveJob
- # Returns the version of the currently loaded ActiveJob as a <tt>Gem::Version</tt>
+ # Returns the version of the currently loaded Active Job as a <tt>Gem::Version</tt>
def self.gem_version
Gem::Version.new VERSION::STRING
end
module VERSION
- MAJOR = 4
- MINOR = 2
+ MAJOR = 5
+ MINOR = 0
TINY = 0
PRE = "alpha"
diff --git a/activejob/lib/active_job/identifier.rb b/activejob/lib/active_job/identifier.rb
deleted file mode 100644
index c7f522087d..0000000000
--- a/activejob/lib/active_job/identifier.rb
+++ /dev/null
@@ -1,15 +0,0 @@
-require 'active_job/arguments'
-
-module ActiveJob
- module Identifier
- extend ActiveSupport::Concern
-
- included do
- attr_writer :job_id
- end
-
- def job_id
- @job_id ||= SecureRandom.uuid
- end
- end
-end
diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb
index ae098a80f3..cd29e6908e 100644
--- a/activejob/lib/active_job/logging.rb
+++ b/activejob/lib/active_job/logging.rb
@@ -3,7 +3,7 @@ require 'active_support/tagged_logging'
require 'active_support/logger'
module ActiveJob
- module Logging
+ module Logging #:nodoc:
extend ActiveSupport::Concern
included do
@@ -17,7 +17,7 @@ module ActiveJob
around_perform do |job, block, _|
tag_logger(job.class.name, job.job_id) do
- payload = {adapter: job.class.queue_adapter, job: job.class, args: job.arguments}
+ payload = {adapter: job.class.queue_adapter, job: job}
ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup)
ActiveSupport::Notifications.instrument("perform.active_job", payload) do
block.call
@@ -26,12 +26,12 @@ module ActiveJob
end
before_enqueue do |job|
- if job.enqueued_at
+ if job.scheduled_at
ActiveSupport::Notifications.instrument "enqueue_at.active_job",
- adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at
+ adapter: job.class.queue_adapter, job: job
else
ActiveSupport::Notifications.instrument "enqueue.active_job",
- adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments
+ adapter: job.class.queue_adapter, job: job
end
end
end
@@ -50,21 +50,33 @@ module ActiveJob
logger.formatter.current_tags.include?("ActiveJob")
end
- class LogSubscriber < ActiveSupport::LogSubscriber
+ class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
def enqueue(event)
- info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event) }
+ info do
+ job = event.payload[:job]
+ "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
+ end
end
def enqueue_at(event)
- info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) }
+ info do
+ job = event.payload[:job]
+ "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
+ end
end
def perform_start(event)
- info { "Performing #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) }
+ info do
+ job = event.payload[:job]
+ "Performing #{job.class.name} from #{queue_name(event)}" + args_info(job)
+ end
end
def perform(event)
- info { "Performed #{event.payload[:job].name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" }
+ info do
+ job = event.payload[:job]
+ "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2)}ms"
+ end
end
private
@@ -72,12 +84,17 @@ module ActiveJob
event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})"
end
- def args_info(event)
- event.payload[:args].any? ? " with arguments: #{event.payload[:args].map(&:inspect).join(", ")}" : ""
+ def args_info(job)
+ if job.arguments.any?
+ ' with arguments: ' +
+ job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg.inspect }.join(', ')
+ else
+ ''
+ end
end
- def enqueued_at(event)
- Time.at(event.payload[:timestamp]).utc
+ def scheduled_at(event)
+ Time.at(event.payload[:job].scheduled_at).utc
end
def logger
diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb
index 13c23abce4..d610d30e01 100644
--- a/activejob/lib/active_job/queue_adapter.rb
+++ b/activejob/lib/active_job/queue_adapter.rb
@@ -2,22 +2,34 @@ require 'active_job/queue_adapters/inline_adapter'
require 'active_support/core_ext/string/inflections'
module ActiveJob
- module QueueAdapter
- mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter }
+ # The <tt>ActiveJob::QueueAdapter</tt> module is used to load the
+ # correct adapter. The default queue adapter is the :inline queue.
+ module QueueAdapter #:nodoc:
+ extend ActiveSupport::Concern
- def queue_adapter=(name_or_adapter)
- @@queue_adapter = \
- case name_or_adapter
- when Symbol, String
- load_adapter(name_or_adapter)
- when Class
- name_or_adapter
- end
- end
+ # Includes the setter method for changing the active queue adapter.
+ module ClassMethods
+ mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter }
- private
- def load_adapter(name)
- "ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize
+ # Specify the backend queue provider. The default queue adapter
+ # is the :inline queue. See QueueAdapters for more
+ # information.
+ def queue_adapter=(name_or_adapter)
+ @@queue_adapter = \
+ case name_or_adapter
+ when :test
+ ActiveJob::QueueAdapters::TestAdapter.new
+ when Symbol, String
+ load_adapter(name_or_adapter)
+ else
+ name_or_adapter if name_or_adapter.respond_to?(:enqueue)
+ end
end
+
+ private
+ def load_adapter(name)
+ "ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize
+ end
+ end
end
-end \ No newline at end of file
+end
diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb
index 007068ff0a..4b91c93dbe 100644
--- a/activejob/lib/active_job/queue_adapters.rb
+++ b/activejob/lib/active_job/queue_adapters.rb
@@ -1,4 +1,39 @@
module ActiveJob
+ # == Active Job adapters
+ #
+ # Active Job has adapters for the following queueing backends:
+ #
+ # * {Backburner}[https://github.com/nesquena/backburner]
+ # * {Delayed Job}[https://github.com/collectiveidea/delayed_job]
+ # * {Qu}[https://github.com/bkeepers/qu]
+ # * {Que}[https://github.com/chanks/que]
+ # * {queue_classic}[https://github.com/QueueClassic/queue_classic]
+ # * {Resque 1.x}[https://github.com/resque/resque/tree/1-x-stable]
+ # * {Sidekiq}[http://sidekiq.org]
+ # * {Sneakers}[https://github.com/jondot/sneakers]
+ # * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch]
+ #
+ # === Backends Features
+ #
+ # | | Async | Queues | Delayed | Priorities | Timeout | Retries |
+ # |-------------------|-------|--------|-----------|------------|---------|---------|
+ # | Backburner | Yes | Yes | Yes | Yes | Job | Global |
+ # | Delayed Job | Yes | Yes | Yes | Job | Global | Global |
+ # | Qu | Yes | Yes | No | No | No | Global |
+ # | Que | Yes | Yes | Yes | Job | No | Job |
+ # | queue_classic | Yes | Yes | No* | No | No | No |
+ # | Resque | Yes | Yes | Yes (Gem) | Queue | Global | Yes |
+ # | Sidekiq | Yes | Yes | Yes | Queue | No | Job |
+ # | Sneakers | Yes | Yes | No | Queue | Queue | No |
+ # | Sucker Punch | Yes | Yes | No | No | No | No |
+ # | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A |
+ # | Active Job | Yes | Yes | Yes | No | No | No |
+ #
+ # NOTE:
+ # queue_classic does not support Job scheduling. However you can implement this
+ # yourself or you can use the queue_classic-later gem. See the documentation for
+ # ActiveJob::QueueAdapters::QueueClassicAdapter.
+ #
module QueueAdapters
extend ActiveSupport::Autoload
@@ -12,5 +47,6 @@ module ActiveJob
autoload :SidekiqAdapter
autoload :SneakersAdapter
autoload :SuckerPunchAdapter
+ autoload :TestAdapter
end
end
diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
index 8d34155645..2453d065de 100644
--- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
@@ -2,22 +2,32 @@ require 'backburner'
module ActiveJob
module QueueAdapters
+ # == Backburner adapter for Active Job
+ #
+ # Backburner is a beanstalkd-powered job queue that can handle a very
+ # high volume of jobs. You create background jobs and place them on
+ # multiple work queues to be processed later. Read more about
+ # Backburner {here}[https://github.com/nesquena/backburner].
+ #
+ # To use Backburner set the queue_adapter config to +:backburner+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :backburner
class BackburnerAdapter
class << self
- def enqueue(job, *args)
- Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name
+ def enqueue(job) #:nodoc:
+ Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name
end
- def enqueue_at(job, timestamp, *args)
- delay = Time.current.to_f - timestamp
- Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name, delay: delay
+ def enqueue_at(job, timestamp) #:nodoc:
+ delay = timestamp - Time.current.to_f
+ Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay
end
end
- class JobWrapper
+ class JobWrapper #:nodoc:
class << self
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
index a00569833a..69d9e70de3 100644
--- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
@@ -2,20 +2,36 @@ require 'delayed_job'
module ActiveJob
module QueueAdapters
+ # == Delayed Job adapter for Active Job
+ #
+ # Delayed::Job (or DJ) encapsulates the common pattern of asynchronously
+ # executing longer tasks in the background. Although DJ can have many
+ # storage backends, one of the most used is based on Active Record.
+ # Read more about Delayed Job {here}[https://github.com/collectiveidea/delayed_job].
+ #
+ # To use Delayed Job, set the queue_adapter config to +:delayed_job+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :delayed_job
class DelayedJobAdapter
class << self
- def enqueue(job, *args)
- JobWrapper.new.delay(queue: job.queue_name).perform(job, *args)
+ def enqueue(job) #:nodoc:
+ Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
end
- def enqueue_at(job, timestamp, *args)
- JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job, *args)
+ def enqueue_at(job, timestamp) #:nodoc:
+ Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
end
end
- class JobWrapper
- def perform(job, *args)
- job.new.execute(*args)
+ class JobWrapper #:nodoc:
+ attr_accessor :job_data
+
+ def initialize(job_data)
+ @job_data = job_data
+ end
+
+ def perform
+ Base.execute(job_data)
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
index 5805340fb0..e25d88e723 100644
--- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
@@ -1,13 +1,21 @@
module ActiveJob
module QueueAdapters
+ # == Active Job Inline adapter
+ #
+ # When enqueueing jobs with the Inline adapter the job will be executed
+ # immediately.
+ #
+ # To use the Inline set the queue_adapter config to +:inline+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :inline
class InlineAdapter
class << self
- def enqueue(job, *args)
- job.new.execute(*args)
+ def enqueue(job) #:nodoc:
+ Base.execute(job.serialize)
end
- def enqueue_at(*)
- raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at https://github.com/rails/activejob")
+ def enqueue_at(*) #:nodoc:
+ raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html")
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb
index 5cb741c094..30aa5a4670 100644
--- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb
@@ -2,27 +2,39 @@ require 'qu'
module ActiveJob
module QueueAdapters
+ # == Qu adapter for Active Job
+ #
+ # Qu is a Ruby library for queuing and processing background jobs. It is
+ # heavily inspired by delayed_job and Resque. Qu was created to overcome
+ # some shortcomings in the existing queuing libraries.
+ # The advantages of Qu are: Multiple backends (redis, mongo), jobs are
+ # requeued when worker is killed, resque-like API.
+ #
+ # Read more about Qu {here}[https://github.com/bkeepers/qu].
+ #
+ # To use Qu set the queue_adapter config to +:qu+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :qu
class QuAdapter
class << self
- def enqueue(job, *args)
- Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload|
+ def enqueue(job, *args) #:nodoc:
+ Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload|
payload.instance_variable_set(:@queue, job.queue_name)
end.push
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp, *args) #:nodoc:
raise NotImplementedError
end
end
- class JobWrapper < Qu::Job
- def initialize(job_name, *args)
- @job = job_name.constantize
- @args = args
+ class JobWrapper < Qu::Job #:nodoc:
+ def initialize(job_data)
+ @job_data = job_data
end
def perform
- @job.new.execute(*@args)
+ Base.execute @job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb
index 9c84c74f83..e501fe0368 100644
--- a/activejob/lib/active_job/queue_adapters/que_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb
@@ -2,20 +2,32 @@ require 'que'
module ActiveJob
module QueueAdapters
+ # == Que adapter for Active Job
+ #
+ # Que is a high-performance alternative to DelayedJob or QueueClassic that
+ # improves the reliability of your application by protecting your jobs with
+ # the same ACID guarantees as the rest of your data. Que is a queue for
+ # Ruby and PostgreSQL that manages jobs using advisory locks.
+ #
+ # Read more about Que {here}[https://github.com/chanks/que].
+ #
+ # To use Que set the queue_adapter config to +:que+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :que
class QueAdapter
class << self
- def enqueue(job, *args)
- JobWrapper.enqueue job.name, *args, queue: job.queue_name
+ def enqueue(job) #:nodoc:
+ JobWrapper.enqueue job.serialize, queue: job.queue_name
end
- def enqueue_at(job, timestamp, *args)
- JobWrapper.enqueue job.name, *args, queue: job.queue_name, run_at: Time.at(timestamp)
+ def enqueue_at(job, timestamp) #:nodoc:
+ JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp)
end
end
- class JobWrapper < Que::Job
- def run(job_name, *args)
- job_name.constantize.new.execute(*args)
+ class JobWrapper < Que::Job #:nodoc:
+ def run(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
index d74f8cf90e..34c11a68b2 100644
--- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
@@ -2,21 +2,50 @@ require 'queue_classic'
module ActiveJob
module QueueAdapters
+ # == queue_classic adapter for Active Job
+ #
+ # queue_classic provides a simple interface to a PostgreSQL-backed message
+ # queue. queue_classic specializes in concurrent locking and minimizing
+ # database load while providing a simple, intuitive developer experience.
+ # queue_classic assumes that you are already using PostgreSQL in your
+ # production environment and that adding another dependency (e.g. redis,
+ # beanstalkd, 0mq) is undesirable.
+ #
+ # Read more about queue_classic {here}[https://github.com/QueueClassic/queue_classic].
+ #
+ # To use queue_classic set the queue_adapter config to +:queue_classic+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :queue_classic
class QueueClassicAdapter
class << self
- def enqueue(job, *args)
- QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args)
+ def enqueue(job) #:nodoc:
+ build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
end
- def enqueue_at(job, timestamp, *args)
- raise NotImplementedError
+ def enqueue_at(job, timestamp) #:nodoc:
+ queue = build_queue(job.queue_name)
+ unless queue.respond_to?(:enqueue_at)
+ raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \
+ 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \
+ 'You can implement this yourself or you can use the queue_classic-later gem.'
+ end
+ queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
+ end
+
+ # Builds a <tt>QC::Queue</tt> object to schedule jobs on.
+ #
+ # If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass
+ # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the
+ # <tt>build_queue</tt> method.
+ def build_queue(queue_name)
+ QC::Queue.new(queue_name)
end
end
- class JobWrapper
+ class JobWrapper #:nodoc:
class << self
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
index da8212fc9b..88c6b48fef 100644
--- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
@@ -14,25 +14,36 @@ end
module ActiveJob
module QueueAdapters
+ # == Resque adapter for Active Job
+ #
+ # Resque (pronounced like "rescue") is a Redis-backed library for creating
+ # background jobs, placing those jobs on multiple queues, and processing
+ # them later.
+ #
+ # Read more about Resque {here}[https://github.com/resque/resque].
+ #
+ # To use Resque set the queue_adapter config to +:resque+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :resque
class ResqueAdapter
class << self
- def enqueue(job, *args)
- Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args
+ def enqueue(job) #:nodoc:
+ Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp) #:nodoc:
unless Resque.respond_to?(:enqueue_at_with_queue)
raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \
"resque-scheduler gem. Please add it to your Gemfile and run bundle install"
end
- Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args
+ Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize
end
end
- class JobWrapper
+ class JobWrapper #:nodoc:
class << self
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
index 3e20bec44c..21005fc728 100644
--- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
@@ -2,32 +2,42 @@ require 'sidekiq'
module ActiveJob
module QueueAdapters
+ # == Sidekiq adapter for Active Job
+ #
+ # Simple, efficient background processing for Ruby. Sidekiq uses threads to
+ # handle many jobs at the same time in the same process. It does not
+ # require Rails but will integrate tightly with it to make background
+ # processing dead simple.
+ #
+ # Read more about Sidekiq {here}[http://sidekiq.org].
+ #
+ # To use Sidekiq set the queue_adapter config to +:sidekiq+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :sidekiq
class SidekiqAdapter
class << self
- def enqueue(job, *args)
+ def enqueue(job) #:nodoc:
#Sidekiq::Client does not support symbols as keys
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
- 'args' => [ job, *args ],
- 'retry' => true
+ 'args' => [ job.serialize ]
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp) #:nodoc:
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
- 'args' => [ job, *args ],
- 'retry' => true,
+ 'args' => [ job.serialize ],
'at' => timestamp
end
end
- class JobWrapper
+ class JobWrapper #:nodoc:
include Sidekiq::Worker
- def perform(job_name, *args)
- job_name.constantize.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
index 48b3df6a46..6d60a2f303 100644
--- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
@@ -3,29 +3,41 @@ require 'thread'
module ActiveJob
module QueueAdapters
+ # == Sneakers adapter for Active Job
+ #
+ # A high-performance RabbitMQ background processing framework for Ruby.
+ # Sneakers is being used in production for both I/O and CPU intensive
+ # workloads, and have achieved the goals of high-performance and
+ # 0-maintenance, as designed.
+ #
+ # Read more about Sneakers {here}[https://github.com/jondot/sneakers].
+ #
+ # To use Sneakers set the queue_adapter config to +:sneakers+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :sneakers
class SneakersAdapter
@monitor = Monitor.new
class << self
- def enqueue(job, *args)
+ def enqueue(job) #:nodoc:
@monitor.synchronize do
JobWrapper.from_queue job.queue_name
- JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ])
+ JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
end
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp) #:nodoc:
raise NotImplementedError
end
end
- class JobWrapper
+ class JobWrapper #:nodoc:
include Sneakers::Worker
- from_queue 'active_jobs_default'
+ from_queue 'default'
def work(msg)
- job_name, *args = ActiveSupport::JSON.decode(msg)
- job_name.constantize.new.execute(*args)
+ job_data = ActiveSupport::JSON.decode(msg)
+ Base.execute job_data
ack!
end
end
diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
index 16f05744f3..be9e7fd03a 100644
--- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
@@ -2,22 +2,37 @@ require 'sucker_punch'
module ActiveJob
module QueueAdapters
+ # == Sucker Punch adapter for Active Job
+ #
+ # Sucker Punch is a single-process Ruby asynchronous processing library.
+ # It's girl_friday and DSL sugar on top of Celluloid. With Celluloid's
+ # actor pattern, we can do asynchronous processing within a single process.
+ # This reduces costs of hosting on a service like Heroku along with the
+ # memory footprint of having to maintain additional jobs if hosting on
+ # a dedicated server. All queues can run within a single Rails/Sinatra
+ # process.
+ #
+ # Read more about Sucker Punch {here}[https://github.com/brandonhilkert/sucker_punch].
+ #
+ # To use Sucker Punch set the queue_adapter config to +:sucker_punch+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :sucker_punch
class SuckerPunchAdapter
class << self
- def enqueue(job, *args)
- JobWrapper.new.async.perform job, *args
+ def enqueue(job) #:nodoc:
+ JobWrapper.new.async.perform job.serialize
end
- def enqueue_at(job, timestamp, *args)
+ def enqueue_at(job, timestamp) #:nodoc:
raise NotImplementedError
end
end
- class JobWrapper
+ class JobWrapper #:nodoc:
include SuckerPunch::Job
- def perform(job, *args)
- job.new.execute(*args)
+ def perform(job_data)
+ Base.execute job_data
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb
new file mode 100644
index 0000000000..ea9df9a063
--- /dev/null
+++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb
@@ -0,0 +1,51 @@
+module ActiveJob
+ module QueueAdapters
+ # == Test adapter for Active Job
+ #
+ # The test adapter should be used only in testing. Along with
+ # <tt>ActiveJob::TestCase</tt> and <tt>ActiveJob::TestHelper</tt>
+ # it makes a great tool to test your Rails application.
+ #
+ # To use the test adapter set queue_adapter config to +:test+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :test
+ class TestAdapter
+ delegate :name, to: :class
+ attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs)
+ attr_writer(:enqueued_jobs, :performed_jobs)
+
+ def initialize
+ self.perform_enqueued_jobs = false
+ self.perform_enqueued_at_jobs = false
+ end
+
+ # Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
+ def enqueued_jobs
+ @enqueued_jobs ||= []
+ end
+
+ # Provides a store of all the performed jobs with the TestAdapter so you can check them.
+ def performed_jobs
+ @performed_jobs ||= []
+ end
+
+ def enqueue(job) #:nodoc:
+ if perform_enqueued_jobs
+ performed_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name}
+ Base.execute job.serialize
+ else
+ enqueued_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name}
+ end
+ end
+
+ def enqueue_at(job, timestamp) #:nodoc:
+ if perform_enqueued_at_jobs
+ performed_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp}
+ Base.execute job.serialize
+ else
+ enqueued_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp}
+ end
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb
index 9698835b6e..9ae0345120 100644
--- a/activejob/lib/active_job/queue_name.rb
+++ b/activejob/lib/active_job/queue_name.rb
@@ -2,20 +2,50 @@ module ActiveJob
module QueueName
extend ActiveSupport::Concern
+ # Includes the ability to override the default queue name and prefix.
module ClassMethods
mattr_accessor(:queue_name_prefix)
mattr_accessor(:default_queue_name) { "default" }
- def queue_as(part_name)
- queue_name = part_name.to_s.presence || default_queue_name
+ # Specifies the name of the queue to process the job on.
+ #
+ # class PublishToFeedJob < ActiveJob::Base
+ # queue_as :feeds
+ #
+ # def perform(post)
+ # post.to_feed!
+ # end
+ # end
+ def queue_as(part_name=nil, &block)
+ if block_given?
+ self.queue_name = block
+ else
+ self.queue_name = queue_name_from_part(part_name)
+ end
+ end
+
+ def queue_name_from_part(part_name) #:nodoc:
+ queue_name = part_name || default_queue_name
name_parts = [queue_name_prefix.presence, queue_name]
- self.queue_name = name_parts.compact.join('_')
+ name_parts.compact.join(queue_name_delimiter)
end
end
included do
- class_attribute :queue_name
+ class_attribute :queue_name, instance_accessor: false
+ class_attribute :queue_name_delimiter, instance_accessor: false
+
self.queue_name = default_queue_name
+ self.queue_name_delimiter = '_' # set default delimiter to '_'
end
+
+ # Returns the name of the queue the job will be run on
+ def queue_name
+ if @queue_name.is_a?(Proc)
+ @queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name))
+ end
+ @queue_name
+ end
+
end
end
diff --git a/activejob/lib/active_job/test_case.rb b/activejob/lib/active_job/test_case.rb
new file mode 100644
index 0000000000..d894a7b5cd
--- /dev/null
+++ b/activejob/lib/active_job/test_case.rb
@@ -0,0 +1,7 @@
+require 'active_support/test_case'
+
+module ActiveJob
+ class TestCase < ActiveSupport::TestCase
+ include ActiveJob::TestHelper
+ end
+end
diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb
new file mode 100644
index 0000000000..2efcea7f2e
--- /dev/null
+++ b/activejob/lib/active_job/test_helper.rb
@@ -0,0 +1,220 @@
+require 'active_support/core_ext/hash/keys'
+
+module ActiveJob
+ # Provides helper methods for testing Active Job
+ module TestHelper
+ extend ActiveSupport::Concern
+
+ included do
+ def before_setup
+ @old_queue_adapter = queue_adapter
+ ActiveJob::Base.queue_adapter = :test
+ clear_enqueued_jobs
+ clear_performed_jobs
+ super
+ end
+
+ def after_teardown
+ super
+ ActiveJob::Base.queue_adapter = @old_queue_adapter
+ end
+
+ # Asserts that the number of enqueued jobs matches the given number.
+ #
+ # def test_jobs
+ # assert_enqueued_jobs 0
+ # HelloJob.perform_later('david')
+ # assert_enqueued_jobs 1
+ # HelloJob.perform_later('abdelkader')
+ # assert_enqueued_jobs 2
+ # end
+ #
+ # If a block is passed, that block should cause the specified number of
+ # jobs to be enqueued.
+ #
+ # def test_jobs_again
+ # assert_enqueued_jobs 1 do
+ # HelloJob.perform_later('cristian')
+ # end
+ #
+ # assert_enqueued_jobs 2 do
+ # HelloJob.perform_later('aaron')
+ # HelloJob.perform_later('rafael')
+ # end
+ # end
+ def assert_enqueued_jobs(number)
+ if block_given?
+ original_count = enqueued_jobs.size
+ yield
+ new_count = enqueued_jobs.size
+ assert_equal original_count + number, new_count,
+ "#{number} jobs expected, but #{new_count - original_count} were enqueued"
+ else
+ enqueued_jobs_size = enqueued_jobs.size
+ assert_equal number, enqueued_jobs_size, "#{number} jobs expected, but #{enqueued_jobs_size} were enqueued"
+ end
+ end
+
+ # Asserts that no jobs have been enqueued.
+ #
+ # def test_jobs
+ # assert_no_enqueued_jobs
+ # HelloJob.perform_later('jeremy')
+ # assert_enqueued_jobs 1
+ # end
+ #
+ # If a block is passed, that block should not cause any job to be enqueued.
+ #
+ # def test_jobs_again
+ # assert_no_enqueued_jobs do
+ # # No job should be enqueued from this block
+ # end
+ # end
+ #
+ # Note: This assertion is simply a shortcut for:
+ #
+ # assert_enqueued_jobs 0, &block
+ def assert_no_enqueued_jobs(&block)
+ assert_enqueued_jobs 0, &block
+ end
+
+ # Asserts that the number of performed jobs matches the given number.
+ # If no block is passed, <tt>perform_enqueued_jobs</tt>
+ # must be called around the job call.
+ #
+ # def test_jobs
+ # assert_performed_jobs 0
+ #
+ # perform_enqueued_jobs do
+ # HelloJob.perform_later('xavier')
+ # end
+ # assert_performed_jobs 1
+ #
+ # perform_enqueued_jobs do
+ # HelloJob.perform_later('yves')
+ # assert_performed_jobs 2
+ # end
+ # end
+ #
+ # If a block is passed, that block should cause the specified number of
+ # jobs to be performed.
+ #
+ # def test_jobs_again
+ # assert_performed_jobs 1 do
+ # HelloJob.perform_later('robin')
+ # end
+ #
+ # assert_performed_jobs 2 do
+ # HelloJob.perform_later('carlos')
+ # HelloJob.perform_later('sean')
+ # end
+ # end
+ def assert_performed_jobs(number)
+ if block_given?
+ original_count = performed_jobs.size
+ perform_enqueued_jobs { yield }
+ new_count = performed_jobs.size
+ assert_equal original_count + number, new_count,
+ "#{number} jobs expected, but #{new_count - original_count} were performed"
+ else
+ performed_jobs_size = performed_jobs.size
+ assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed"
+ end
+ end
+
+ # Asserts that no jobs have been performed.
+ #
+ # def test_jobs
+ # assert_no_performed_jobs
+ #
+ # perform_enqueued_jobs do
+ # HelloJob.perform_later('matthew')
+ # assert_performed_jobs 1
+ # end
+ # end
+ #
+ # If a block is passed, that block should not cause any job to be performed.
+ #
+ # def test_jobs_again
+ # assert_no_performed_jobs do
+ # # No job should be performed from this block
+ # end
+ # end
+ #
+ # Note: This assertion is simply a shortcut for:
+ #
+ # assert_performed_jobs 0, &block
+ def assert_no_performed_jobs(&block)
+ assert_performed_jobs 0, &block
+ end
+
+ # Asserts that the job passed in the block has been enqueued with the given arguments.
+ #
+ # def test_assert_enqueued_with
+ # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do
+ # MyJob.perform_later(1,2,3)
+ # end
+ # end
+ def assert_enqueued_with(args = {}, &_block)
+ original_enqueued_jobs = enqueued_jobs.dup
+ clear_enqueued_jobs
+ args.assert_valid_keys(:job, :args, :at, :queue)
+ yield
+ matching_job = enqueued_jobs.any? do |job|
+ args.all? { |key, value| value == job[key] }
+ end
+ assert matching_job, "No enqueued job found with #{args}"
+ ensure
+ queue_adapter.enqueued_jobs = original_enqueued_jobs + enqueued_jobs
+ end
+
+ # Asserts that the job passed in the block has been performed with the given arguments.
+ #
+ # def test_assert_performed_with
+ # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do
+ # MyJob.perform_later(1,2,3)
+ # end
+ # end
+ def assert_performed_with(args = {}, &_block)
+ original_performed_jobs = performed_jobs.dup
+ clear_performed_jobs
+ args.assert_valid_keys(:job, :args, :at, :queue)
+ perform_enqueued_jobs { yield }
+ matching_job = performed_jobs.any? do |job|
+ args.all? { |key, value| value == job[key] }
+ end
+ assert matching_job, "No performed job found with #{args}"
+ ensure
+ queue_adapter.performed_jobs = original_performed_jobs + performed_jobs
+ end
+
+ def perform_enqueued_jobs
+ @old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs
+ @old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs
+ queue_adapter.perform_enqueued_jobs = true
+ queue_adapter.perform_enqueued_at_jobs = true
+ yield
+ ensure
+ queue_adapter.perform_enqueued_jobs = @old_perform_enqueued_jobs
+ queue_adapter.perform_enqueued_at_jobs = @old_perform_enqueued_at_jobs
+ end
+
+ def queue_adapter
+ ActiveJob::Base.queue_adapter
+ end
+
+ delegate :enqueued_jobs, :enqueued_jobs=,
+ :performed_jobs, :performed_jobs=,
+ to: :queue_adapter
+
+ private
+ def clear_enqueued_jobs
+ enqueued_jobs.clear
+ end
+
+ def clear_performed_jobs
+ performed_jobs.clear
+ end
+ end
+ end
+end
diff --git a/activejob/test/adapters/test.rb b/activejob/test/adapters/test.rb
new file mode 100644
index 0000000000..7180b38a57
--- /dev/null
+++ b/activejob/test/adapters/test.rb
@@ -0,0 +1,3 @@
+ActiveJob::Base.queue_adapter = :test
+ActiveJob::Base.queue_adapter.perform_enqueued_jobs = true
+ActiveJob::Base.queue_adapter.perform_enqueued_at_jobs = true
diff --git a/activejob/test/cases/adapter_test.rb b/activejob/test/cases/adapter_test.rb
index 4fc235ae40..6570c55a83 100644
--- a/activejob/test/cases/adapter_test.rb
+++ b/activejob/test/cases/adapter_test.rb
@@ -2,7 +2,6 @@ require 'helper'
class AdapterTest < ActiveSupport::TestCase
test "should load #{ENV['AJADAPTER']} adapter" do
- ActiveJob::Base.queue_adapter = ENV['AJADAPTER'].to_sym
- assert_equal ActiveJob::Base.queue_adapter, "active_job/queue_adapters/#{ENV['AJADAPTER']}_adapter".classify.constantize
+ assert_equal "active_job/queue_adapters/#{ENV['AJADAPTER']}_adapter".classify, ActiveJob::Base.queue_adapter.name
end
end
diff --git a/activejob/test/cases/argument_serialization_test.rb b/activejob/test/cases/argument_serialization_test.rb
new file mode 100644
index 0000000000..dbe36fc572
--- /dev/null
+++ b/activejob/test/cases/argument_serialization_test.rb
@@ -0,0 +1,84 @@
+require 'helper'
+require 'active_job/arguments'
+require 'models/person'
+require 'active_support/core_ext/hash/indifferent_access'
+
+class ArgumentSerializationTest < ActiveSupport::TestCase
+ setup do
+ @person = Person.find('5')
+ end
+
+ [ nil, 1, 1.0, 1_000_000_000_000_000_000_000,
+ 'a', true, false,
+ [ 1, 'a' ],
+ { 'a' => 1 }
+ ].each do |arg|
+ test "serializes #{arg.class} verbatim" do
+ assert_arguments_unchanged arg
+ end
+ end
+
+ [ :a, Object.new, self, Person.find('5').to_gid ].each do |arg|
+ test "does not serialize #{arg.class}" do
+ assert_raises ActiveJob::SerializationError do
+ ActiveJob::Arguments.serialize [ arg ]
+ end
+
+ assert_raises ActiveJob::DeserializationError do
+ ActiveJob::Arguments.deserialize [ arg ]
+ end
+ end
+ end
+
+ test 'should convert records to Global IDs' do
+ assert_arguments_roundtrip [@person], ['_aj_globalid' => @person.to_gid.to_s]
+ end
+
+ test 'should dive deep into arrays and hashes' do
+ assert_arguments_roundtrip [3, [@person]], [3, ['_aj_globalid' => @person.to_gid.to_s]]
+ assert_arguments_roundtrip [{ 'a' => @person }], [{ 'a' => { '_aj_globalid' => @person.to_gid.to_s }}.with_indifferent_access]
+ end
+
+ test 'should stringify symbol hash keys' do
+ assert_equal [ 'a' => 1 ], ActiveJob::Arguments.serialize([ a: 1 ])
+ end
+
+ test 'should disallow non-string/symbol hash keys' do
+ assert_raises ActiveJob::SerializationError do
+ ActiveJob::Arguments.serialize [ { 1 => 2 } ]
+ end
+
+ assert_raises ActiveJob::SerializationError do
+ ActiveJob::Arguments.serialize [ { :a => [{ 2 => 3 }] } ]
+ end
+
+ assert_raises ActiveJob::SerializationError do
+ ActiveJob::Arguments.serialize [ '_aj_globalid' => 1 ]
+ end
+
+ assert_raises ActiveJob::SerializationError do
+ ActiveJob::Arguments.serialize [ :_aj_globalid => 1 ]
+ end
+ end
+
+ test 'should not allow non-primitive objects' do
+ assert_raises ActiveJob::SerializationError do
+ ActiveJob::Arguments.serialize [Object.new]
+ end
+
+ assert_raises ActiveJob::SerializationError do
+ ActiveJob::Arguments.serialize [1, [Object.new]]
+ end
+ end
+
+ private
+ def assert_arguments_unchanged(*args)
+ assert_arguments_roundtrip args, args
+ end
+
+ def assert_arguments_roundtrip(args, expected_serialized_args)
+ serialized = ActiveJob::Arguments.serialize(args)
+ assert_equal expected_serialized_args, serialized
+ assert_equal args, ActiveJob::Arguments.deserialize(serialized)
+ end
+end
diff --git a/activejob/test/cases/callbacks_test.rb b/activejob/test/cases/callbacks_test.rb
index 9a0657ee89..9af2380767 100644
--- a/activejob/test/cases/callbacks_test.rb
+++ b/activejob/test/cases/callbacks_test.rb
@@ -5,7 +5,8 @@ require 'active_support/core_ext/object/inclusion'
class CallbacksTest < ActiveSupport::TestCase
test 'perform callbacks' do
- performed_callback_job = CallbackJob.new.tap { |j| j.execute("A-JOB-ID") }
+ performed_callback_job = CallbackJob.new("A-JOB-ID")
+ performed_callback_job.perform_now
assert "CallbackJob ran before_perform".in? performed_callback_job.history
assert "CallbackJob ran after_perform".in? performed_callback_job.history
assert "CallbackJob ran around_perform_start".in? performed_callback_job.history
@@ -13,7 +14,7 @@ class CallbacksTest < ActiveSupport::TestCase
end
test 'enqueue callbacks' do
- enqueued_callback_job = CallbackJob.enqueue
+ enqueued_callback_job = CallbackJob.perform_later
assert "CallbackJob ran before_enqueue".in? enqueued_callback_job.history
assert "CallbackJob ran after_enqueue".in? enqueued_callback_job.history
assert "CallbackJob ran around_enqueue_start".in? enqueued_callback_job.history
diff --git a/activejob/test/cases/job_serialization_test.rb b/activejob/test/cases/job_serialization_test.rb
index fc1b77744c..db22783030 100644
--- a/activejob/test/cases/job_serialization_test.rb
+++ b/activejob/test/cases/job_serialization_test.rb
@@ -9,7 +9,7 @@ class JobSerializationTest < ActiveSupport::TestCase
end
test 'serialize job with gid' do
- GidJob.enqueue @person
+ GidJob.perform_later @person
assert_equal "Person with ID: 5", JobBuffer.last_value
end
end
diff --git a/activejob/test/cases/logging_test.rb b/activejob/test/cases/logging_test.rb
index 888c183a0b..64aae00441 100644
--- a/activejob/test/cases/logging_test.rb
+++ b/activejob/test/cases/logging_test.rb
@@ -4,6 +4,7 @@ require 'active_support/core_ext/numeric/time'
require 'jobs/hello_job'
require 'jobs/logging_job'
require 'jobs/nested_job'
+require 'models/person'
class AdapterTest < ActiveSupport::TestCase
include ActiveSupport::LogSubscriber::TestHelper
@@ -33,7 +34,7 @@ class AdapterTest < ActiveSupport::TestCase
def teardown
super
ActiveJob::Logging::LogSubscriber.log_subscribers.pop
- ActiveJob::Base.logger = @old_logger
+ set_logger @old_logger
end
def set_logger(logger)
@@ -42,34 +43,51 @@ class AdapterTest < ActiveSupport::TestCase
def test_uses_active_job_as_tag
- HelloJob.enqueue "Cristian"
+ HelloJob.perform_later "Cristian"
assert_match(/\[ActiveJob\]/, @logger.messages)
end
+ def test_uses_job_name_as_tag
+ LoggingJob.perform_later "Dummy"
+ assert_match(/\[LoggingJob\]/, @logger.messages)
+ end
+
+ def test_uses_job_id_as_tag
+ LoggingJob.perform_later "Dummy"
+ assert_match(/\[LOGGING-JOB-ID\]/, @logger.messages)
+ end
+
+ def test_logs_correct_queue_name
+ original_queue_name = LoggingJob.queue_name
+ LoggingJob.queue_as :php_jobs
+ LoggingJob.perform_later("Dummy")
+ assert_match(/to .*?\(php_jobs\).*/, @logger.messages)
+ ensure
+ LoggingJob.queue_name = original_queue_name
+ end
+
+ def test_globalid_parameter_logging
+ person = Person.new(123)
+ LoggingJob.perform_later person
+ assert_match(%r{Enqueued.*gid://aj/Person/123}, @logger.messages)
+ assert_match(%r{Dummy, here is it: #<Person:.*>}, @logger.messages)
+ assert_match(%r{Performing.*gid://aj/Person/123}, @logger.messages)
+ end
+
def test_enqueue_job_logging
- HelloJob.enqueue "Cristian"
+ HelloJob.perform_later "Cristian"
assert_match(/Enqueued HelloJob \(Job ID: .*?\) to .*?:.*Cristian/, @logger.messages)
end
def test_perform_job_logging
- LoggingJob.enqueue "Dummy"
+ LoggingJob.perform_later "Dummy"
assert_match(/Performing LoggingJob from .*? with arguments:.*Dummy/, @logger.messages)
assert_match(/Dummy, here is it: Dummy/, @logger.messages)
assert_match(/Performed LoggingJob from .*? in .*ms/, @logger.messages)
end
- def test_perform_uses_job_name_job_logging
- LoggingJob.enqueue "Dummy"
- assert_match(/\[LoggingJob\]/, @logger.messages)
- end
-
- def test_perform_uses_job_id_job_logging
- LoggingJob.enqueue "Dummy"
- assert_match(/\[LOGGING-JOB-ID\]/, @logger.messages)
- end
-
def test_perform_nested_jobs_logging
- NestedJob.enqueue
+ NestedJob.perform_later
assert_match(/\[LoggingJob\] \[.*?\]/, @logger.messages)
assert_match(/\[ActiveJob\] Enqueued NestedJob \(Job ID: .*\) to/, @logger.messages)
assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Performing NestedJob from/, @logger.messages)
@@ -81,14 +99,14 @@ class AdapterTest < ActiveSupport::TestCase
end
def test_enqueue_at_job_logging
- HelloJob.enqueue_at 1, "Cristian"
+ HelloJob.set(wait_until: 24.hours.from_now).perform_later "Cristian"
assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages)
rescue NotImplementedError
skip
end
def test_enqueue_in_job_logging
- HelloJob.enqueue_in 2, "Cristian"
+ HelloJob.set(wait: 2.seconds).perform_later "Cristian"
assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages)
rescue NotImplementedError
skip
diff --git a/activejob/test/cases/parameters_test.rb b/activejob/test/cases/parameters_test.rb
deleted file mode 100644
index 78853c51e1..0000000000
--- a/activejob/test/cases/parameters_test.rb
+++ /dev/null
@@ -1,77 +0,0 @@
-require 'helper'
-require 'active_job/arguments'
-require 'models/person'
-require 'active_support/core_ext/hash/indifferent_access'
-
-class ParameterSerializationTest < ActiveSupport::TestCase
- test 'should make no change to regular values' do
- assert_equal [ 1, "something" ], ActiveJob::Arguments.serialize([ 1, "something" ])
- end
-
- test 'should not allow complex objects' do
- assert_equal [ nil ], ActiveJob::Arguments.serialize([ nil ])
- assert_equal [ 1 ], ActiveJob::Arguments.serialize([ 1 ])
- assert_equal [ 1.0 ], ActiveJob::Arguments.serialize([ 1.0 ])
- assert_equal [ 'a' ], ActiveJob::Arguments.serialize([ 'a' ])
- assert_equal [ true ], ActiveJob::Arguments.serialize([ true ])
- assert_equal [ false ], ActiveJob::Arguments.serialize([ false ])
- assert_equal [ { "a" => 1, "b" => 2 } ], ActiveJob::Arguments.serialize([ { a: 1, "b" => 2 } ])
- assert_equal [ [ 1 ] ], ActiveJob::Arguments.serialize([ [ 1 ] ])
- assert_equal [ 1_000_000_000_000_000_000_000 ], ActiveJob::Arguments.serialize([ 1_000_000_000_000_000_000_000 ])
-
- err = assert_raises ActiveJob::SerializationError do
- ActiveJob::Arguments.serialize([ 1, self ])
- end
- assert_equal "Unsupported argument type: #{self.class.name}", err.message
- end
-
- test 'should dive deep into arrays or hashes' do
- assert_equal [ { "a" => Person.find(5).gid.to_s }.with_indifferent_access ], ActiveJob::Arguments.serialize([ { a: Person.find(5) } ])
- assert_equal [ [ Person.find(5).gid.to_s ] ], ActiveJob::Arguments.serialize([ [ Person.find(5) ] ])
- end
-
- test 'should dive deep into arrays or hashes and raise exception on complex objects' do
- err = assert_raises ActiveJob::SerializationError do
- ActiveJob::Arguments.serialize([ 1, [self] ])
- end
- assert_equal "Unsupported argument type: #{self.class.name}", err.message
- end
-
- test 'shoud dive deep into hashes and allow raise exception on not string/symbol keys' do
- err = assert_raises ActiveJob::SerializationError do
- ActiveJob::Arguments.serialize([ [ { 1 => 2 } ] ])
- end
- assert_equal "Unsupported hash key type: Fixnum", err.message
- end
-
- test 'should serialize records with global id' do
- assert_equal [ Person.find(5).gid.to_s ], ActiveJob::Arguments.serialize([ Person.find(5) ])
- end
-
- test 'should serialize values and records together' do
- assert_equal [ 3, Person.find(5).gid.to_s ], ActiveJob::Arguments.serialize([ 3, Person.find(5) ])
- end
-end
-
-class ParameterDeserializationTest < ActiveSupport::TestCase
- test 'should make no change to regular values' do
- assert_equal [ 1, "something" ], ActiveJob::Arguments.deserialize([ 1, "something" ])
- end
-
- test 'should deserialize records with global id' do
- assert_equal [ Person.find(5) ], ActiveJob::Arguments.deserialize([ Person.find(5).gid ])
- end
-
- test 'should serialize values and records together' do
- assert_equal [ 3, Person.find(5) ], ActiveJob::Arguments.deserialize([ 3, Person.find(5).gid ])
- end
-
- test 'should dive deep when deserialising arrays' do
- assert_equal [ [ 3, Person.find(5) ] ], ActiveJob::Arguments.deserialize([ [ 3, Person.find(5).gid ] ])
- end
-
- test 'should dive deep when deserialising hashes' do
- assert_equal [ { "5" => Person.find(5) } ], ActiveJob::Arguments.deserialize([ { "5" => Person.find(5).gid } ])
- end
-
-end
diff --git a/activejob/test/cases/queue_naming_test.rb b/activejob/test/cases/queue_naming_test.rb
index fdfd1afceb..898016a704 100644
--- a/activejob/test/cases/queue_naming_test.rb
+++ b/activejob/test/cases/queue_naming_test.rb
@@ -8,24 +8,67 @@ class QueueNamingTest < ActiveSupport::TestCase
assert_equal "default", HelloJob.queue_name
end
- test 'name appended in job' do
+ test 'uses given queue name job' do
+ original_queue_name = HelloJob.queue_name
+
begin
HelloJob.queue_as :greetings
- LoggingJob.queue_as :bookkeeping
+ assert_equal "greetings", HelloJob.new.queue_name
+ ensure
+ HelloJob.queue_name = original_queue_name
+ end
+ end
+
+ test 'allows a blank queue name' do
+ original_queue_name = HelloJob.queue_name
+
+ begin
+ HelloJob.queue_as ""
+ assert_equal "", HelloJob.new.queue_name
+ ensure
+ HelloJob.queue_name = original_queue_name
+ end
+ end
+
+ test 'does not use a nil queue name' do
+ original_queue_name = HelloJob.queue_name
+
+ begin
+ HelloJob.queue_as nil
+ assert_equal "default", HelloJob.new.queue_name
+ ensure
+ HelloJob.queue_name = original_queue_name
+ end
+ end
+
+ test 'evals block given to queue_as to determine queue' do
+ original_queue_name = HelloJob.queue_name
- assert_equal "default", NestedJob.queue_name
- assert_equal "greetings", HelloJob.queue_name
- assert_equal "bookkeeping", LoggingJob.queue_name
+ begin
+ HelloJob.queue_as { :another }
+ assert_equal "another", HelloJob.new.queue_name
ensure
- HelloJob.queue_name = LoggingJob.queue_name = ActiveJob::Base.default_queue_name
+ HelloJob.queue_name = original_queue_name
end
end
- test 'should prefix the queue name' do
+ test 'can use arguments to determine queue_name in queue_as block' do
+ original_queue_name = HelloJob.queue_name
+
begin
- original_queue_name_prefix = ActiveJob::Base.queue_name_prefix
- original_queue_name = HelloJob.queue_name
+ HelloJob.queue_as { self.arguments.first=='1' ? :one : :two }
+ assert_equal "one", HelloJob.new('1').queue_name
+ assert_equal "two", HelloJob.new('3').queue_name
+ ensure
+ HelloJob.queue_name = original_queue_name
+ end
+ end
+
+ test 'queue_name_prefix prepended to the queue name with default delimiter' do
+ original_queue_name_prefix = ActiveJob::Base.queue_name_prefix
+ original_queue_name = HelloJob.queue_name
+ begin
ActiveJob::Base.queue_name_prefix = 'aj'
HelloJob.queue_as :low
assert_equal 'aj_low', HelloJob.queue_name
@@ -35,4 +78,25 @@ class QueueNamingTest < ActiveSupport::TestCase
end
end
+ test 'queue_name_prefix prepended to the queue name with custom delimiter' do
+ original_queue_name_prefix = ActiveJob::Base.queue_name_prefix
+ original_queue_name_delimiter = ActiveJob::Base.queue_name_delimiter
+ original_queue_name = HelloJob.queue_name
+
+ begin
+ ActiveJob::Base.queue_name_delimiter = '.'
+ ActiveJob::Base.queue_name_prefix = 'aj'
+ HelloJob.queue_as :low
+ assert_equal 'aj.low', HelloJob.queue_name
+ ensure
+ ActiveJob::Base.queue_name_prefix = original_queue_name_prefix
+ ActiveJob::Base.queue_name_delimiter = original_queue_name_delimiter
+ HelloJob.queue_name = original_queue_name
+ end
+ end
+
+ test 'uses queue passed to #set' do
+ job = HelloJob.set(queue: :some_queue).perform_later
+ assert_equal "some_queue", job.queue_name
+ end
end
diff --git a/activejob/test/cases/queuing_test.rb b/activejob/test/cases/queuing_test.rb
index f020316d7e..0eeabbf693 100644
--- a/activejob/test/cases/queuing_test.rb
+++ b/activejob/test/cases/queuing_test.rb
@@ -9,18 +9,18 @@ class QueuingTest < ActiveSupport::TestCase
end
test 'run queued job' do
- HelloJob.enqueue
+ HelloJob.perform_later
assert_equal "David says hello", JobBuffer.last_value
end
test 'run queued job with arguments' do
- HelloJob.enqueue "Jamie"
+ HelloJob.perform_later "Jamie"
assert_equal "Jamie says hello", JobBuffer.last_value
end
test 'run queued job later' do
begin
- result = HelloJob.enqueue_at 1.second.ago, "Jamie"
+ result = HelloJob.set(wait_until: 1.second.ago).perform_later "Jamie"
assert result
rescue NotImplementedError
skip
@@ -28,15 +28,15 @@ class QueuingTest < ActiveSupport::TestCase
end
test 'job returned by enqueue has the arguments available' do
- job = HelloJob.enqueue "Jamie"
+ job = HelloJob.perform_later "Jamie"
assert_equal [ "Jamie" ], job.arguments
end
- test 'job returned by enqueue_at has the timestamp available' do
+ test 'job returned by perform_at has the timestamp available' do
begin
- job = HelloJob.enqueue_at Time.utc(2014, 1, 1)
- assert_equal Time.utc(2014, 1, 1), job.enqueued_at
+ job = HelloJob.set(wait_until: Time.utc(2014, 1, 1)).perform_later
+ assert_equal Time.utc(2014, 1, 1).to_f, job.scheduled_at
rescue NotImplementedError
skip
end
diff --git a/activejob/test/cases/rescue_test.rb b/activejob/test/cases/rescue_test.rb
index d9ea3c91d7..58c9ca8992 100644
--- a/activejob/test/cases/rescue_test.rb
+++ b/activejob/test/cases/rescue_test.rb
@@ -2,30 +2,33 @@ require 'helper'
require 'jobs/rescue_job'
require 'models/person'
-require 'active_support/core_ext/object/inclusion'
-
class RescueTest < ActiveSupport::TestCase
setup do
JobBuffer.clear
end
test 'rescue perform exception with retry' do
- job = RescueJob.new
- job.execute(SecureRandom.uuid, "david")
+ job = RescueJob.new("david")
+ job.perform_now
assert_equal [ "rescued from ArgumentError", "performed beautifully" ], JobBuffer.values
end
test 'let through unhandled perform exception' do
- job = RescueJob.new
+ job = RescueJob.new("other")
assert_raises(RescueJob::OtherError) do
- job.execute(SecureRandom.uuid, "other")
+ job.perform_now
end
end
test 'rescue from deserialization errors' do
- RescueJob.enqueue Person.new(404)
+ RescueJob.perform_later Person.new(404)
assert_includes JobBuffer.values, 'rescued from DeserializationError'
assert_includes JobBuffer.values, 'DeserializationError original exception was Person::RecordNotFound'
assert_not_includes JobBuffer.values, 'performed beautifully'
end
+
+ test "should not wrap DeserializationError in DeserializationError" do
+ RescueJob.perform_later [Person.new(404)]
+ assert_includes JobBuffer.values, 'DeserializationError original exception was Person::RecordNotFound'
+ end
end
diff --git a/activejob/test/cases/test_case_test.rb b/activejob/test/cases/test_case_test.rb
new file mode 100644
index 0000000000..1d0fdbd22d
--- /dev/null
+++ b/activejob/test/cases/test_case_test.rb
@@ -0,0 +1,14 @@
+require 'helper'
+require 'jobs/hello_job'
+require 'jobs/logging_job'
+require 'jobs/nested_job'
+
+class ActiveJobTestCaseTest < ActiveJob::TestCase
+ def test_include_helper
+ assert_includes self.class.ancestors, ActiveJob::TestHelper
+ end
+
+ def test_set_test_adapter
+ assert_instance_of ActiveJob::QueueAdapters::TestAdapter, self.queue_adapter
+ end
+end
diff --git a/activejob/test/cases/test_helper_test.rb b/activejob/test/cases/test_helper_test.rb
new file mode 100644
index 0000000000..784ede3674
--- /dev/null
+++ b/activejob/test/cases/test_helper_test.rb
@@ -0,0 +1,226 @@
+require 'helper'
+require 'active_support/core_ext/time'
+require 'active_support/core_ext/date'
+require 'jobs/hello_job'
+require 'jobs/logging_job'
+require 'jobs/nested_job'
+
+class EnqueuedJobsTest < ActiveJob::TestCase
+ def test_assert_enqueued_jobs
+ assert_nothing_raised do
+ assert_enqueued_jobs 1 do
+ HelloJob.perform_later('david')
+ end
+ end
+ end
+
+ def test_repeated_enqueued_jobs_calls
+ assert_nothing_raised do
+ assert_enqueued_jobs 1 do
+ HelloJob.perform_later('abdelkader')
+ end
+ end
+
+ assert_nothing_raised do
+ assert_enqueued_jobs 2 do
+ HelloJob.perform_later('sean')
+ HelloJob.perform_later('yves')
+ end
+ end
+ end
+
+ def test_assert_enqueued_jobs_with_no_block
+ assert_nothing_raised do
+ HelloJob.perform_later('rafael')
+ assert_enqueued_jobs 1
+ end
+
+ assert_nothing_raised do
+ HelloJob.perform_later('aaron')
+ HelloJob.perform_later('matthew')
+ assert_enqueued_jobs 3
+ end
+ end
+
+ def test_assert_no_enqueued_jobs_with_no_block
+ assert_nothing_raised do
+ assert_no_enqueued_jobs
+ end
+ end
+
+ def test_assert_no_enqueued_jobs
+ assert_nothing_raised do
+ assert_no_enqueued_jobs do
+ HelloJob.perform_now
+ end
+ end
+ end
+
+ def test_assert_enqueued_jobs_too_few_sent
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_enqueued_jobs 2 do
+ HelloJob.perform_later('xavier')
+ end
+ end
+
+ assert_match(/2 .* but 1/, error.message)
+ end
+
+ def test_assert_enqueued_jobs_too_many_sent
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_enqueued_jobs 1 do
+ HelloJob.perform_later('cristian')
+ HelloJob.perform_later('guillermo')
+ end
+ end
+
+ assert_match(/1 .* but 2/, error.message)
+ end
+
+ def test_assert_no_enqueued_jobs_failure
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_no_enqueued_jobs do
+ HelloJob.perform_later('jeremy')
+ end
+ end
+
+ assert_match(/0 .* but 1/, error.message)
+ end
+
+ def test_assert_enqueued_job
+ assert_enqueued_with(job: LoggingJob, queue: 'default') do
+ LoggingJob.set(wait_until: Date.tomorrow.noon).perform_later
+ end
+ end
+
+ def test_assert_enqueued_job_failure
+ assert_raise ActiveSupport::TestCase::Assertion do
+ assert_enqueued_with(job: LoggingJob, queue: 'default') do
+ NestedJob.perform_later
+ end
+ end
+
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_enqueued_with(job: NestedJob, queue: 'low') do
+ NestedJob.perform_later
+ end
+ end
+
+ assert_equal 'No enqueued job found with {:job=>NestedJob, :queue=>"low"}', error.message
+ end
+
+ def test_assert_enqueued_job_args
+ assert_raise ArgumentError do
+ assert_enqueued_with(class: LoggingJob) do
+ NestedJob.set(wait_until: Date.tomorrow.noon).perform_later
+ end
+ end
+ end
+end
+
+class PerformedJobsTest < ActiveJob::TestCase
+ def test_assert_performed_jobs
+ assert_nothing_raised do
+ assert_performed_jobs 1 do
+ HelloJob.perform_later('david')
+ end
+ end
+ end
+
+ def test_repeated_performed_jobs_calls
+ assert_nothing_raised do
+ assert_performed_jobs 1 do
+ HelloJob.perform_later('abdelkader')
+ end
+ end
+
+ assert_nothing_raised do
+ assert_performed_jobs 2 do
+ HelloJob.perform_later('sean')
+ HelloJob.perform_later('yves')
+ end
+ end
+ end
+
+ def test_assert_performed_jobs_with_no_block
+ assert_nothing_raised do
+ perform_enqueued_jobs do
+ HelloJob.perform_later('rafael')
+ end
+ assert_performed_jobs 1
+ end
+
+ assert_nothing_raised do
+ perform_enqueued_jobs do
+ HelloJob.perform_later('aaron')
+ HelloJob.perform_later('matthew')
+ assert_performed_jobs 3
+ end
+ end
+ end
+
+ def test_assert_no_performed_jobs_with_no_block
+ assert_nothing_raised do
+ assert_no_performed_jobs
+ end
+ end
+
+ def test_assert_no_performed_jobs
+ assert_nothing_raised do
+ assert_no_performed_jobs do
+ # empty block won't perform jobs
+ end
+ end
+ end
+
+ def test_assert_performed_jobs_too_few_sent
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_performed_jobs 2 do
+ HelloJob.perform_later('xavier')
+ end
+ end
+
+ assert_match(/2 .* but 1/, error.message)
+ end
+
+ def test_assert_performed_jobs_too_many_sent
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_performed_jobs 1 do
+ HelloJob.perform_later('cristian')
+ HelloJob.perform_later('guillermo')
+ end
+ end
+
+ assert_match(/1 .* but 2/, error.message)
+ end
+
+ def test_assert_no_performed_jobs_failure
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_no_performed_jobs do
+ HelloJob.perform_later('jeremy')
+ end
+ end
+
+ assert_match(/0 .* but 1/, error.message)
+ end
+
+ def test_assert_performed_job
+ assert_performed_with(job: NestedJob, queue: 'default') do
+ NestedJob.perform_later
+ end
+ end
+
+ def test_assert_performed_job_failure
+ assert_raise ActiveSupport::TestCase::Assertion do
+ assert_performed_with(job: LoggingJob, at: Date.tomorrow.noon, queue: 'default') do
+ NestedJob.set(wait_until: Date.tomorrow.noon).perform_later
+ end
+ end
+
+ assert_raise ActiveSupport::TestCase::Assertion do
+ assert_performed_with(job: NestedJob, at: Date.tomorrow.noon, queue: 'low') do
+ NestedJob.set(queue: 'low', wait_until: Date.tomorrow.noon).perform_later
+ end
+ end
+ end
+end
diff --git a/activejob/test/helper.rb b/activejob/test/helper.rb
index a56b35da12..ec0c8a8ede 100644
--- a/activejob/test/helper.rb
+++ b/activejob/test/helper.rb
@@ -1,6 +1,7 @@
require File.expand_path('../../../load_paths', __FILE__)
require 'active_job'
+require 'support/job_buffer'
GlobalID.app = 'aj'
@@ -10,21 +11,19 @@ def sidekiq?
@adapter == 'sidekiq'
end
-def rubinius?
- RUBY_ENGINE == 'rbx'
-end
-
def ruby_193?
RUBY_VERSION == '1.9.3' && RUBY_ENGINE != 'java'
end
-#Sidekiq don't work with MRI 1.9.3
-#Travis uses rbx 2.6 which don't support unicode characters in methods.
-#Remove the check when Travis change to rbx 2.7+
-exit if sidekiq? && (ruby_193? || rubinius?)
+# Sidekiq doesn't work with MRI 1.9.3
+exit if sidekiq? && ruby_193?
-require "adapters/#{@adapter}"
+if ENV['AJ_INTEGRATION_TESTS']
+ require 'support/integration/helper'
+else
+ require "adapters/#{@adapter}"
+end
require 'active_support/testing/autorun'
-ActiveJob::Base.logger.level = Logger::DEBUG
+ActiveSupport::TestCase.test_order = :random
diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb
new file mode 100644
index 0000000000..38874b51a8
--- /dev/null
+++ b/activejob/test/integration/queuing_test.rb
@@ -0,0 +1,47 @@
+require 'helper'
+require 'jobs/logging_job'
+require 'active_support/core_ext/numeric/time'
+
+class QueuingTest < ActiveSupport::TestCase
+ test 'should run jobs enqueued on a listening queue' do
+ TestJob.perform_later @id
+ wait_for_jobs_to_finish_for(5.seconds)
+ assert job_executed
+ end
+
+ test 'should not run jobs queued on a non-listening queue' do
+ skip if adapter_is?(:inline) || adapter_is?(:sucker_punch)
+ old_queue = TestJob.queue_name
+
+ begin
+ TestJob.queue_as :some_other_queue
+ TestJob.perform_later @id
+ wait_for_jobs_to_finish_for(2.seconds)
+ assert_not job_executed
+ ensure
+ TestJob.queue_name = old_queue
+ end
+ end
+
+ test 'should not run job enqueued in the future' do
+ begin
+ TestJob.set(wait: 10.minutes).perform_later @id
+ wait_for_jobs_to_finish_for(5.seconds)
+ assert_not job_executed
+ rescue NotImplementedError
+ skip
+ end
+ end
+
+ test 'should run job enqueued in the future at the specified time' do
+ begin
+ TestJob.set(wait: 3.seconds).perform_later @id
+ wait_for_jobs_to_finish_for(2.seconds)
+ assert_not job_executed
+ wait_for_jobs_to_finish_for(10.seconds)
+ assert job_executed
+ rescue NotImplementedError
+ skip
+ end
+ end
+end
diff --git a/activejob/test/jobs/callback_job.rb b/activejob/test/jobs/callback_job.rb
index 056dd073e8..891ed9464e 100644
--- a/activejob/test/jobs/callback_job.rb
+++ b/activejob/test/jobs/callback_job.rb
@@ -1,12 +1,21 @@
class CallbackJob < ActiveJob::Base
before_perform ->(job) { job.history << "CallbackJob ran before_perform" }
- after_perform ->(job) { job.history << "CallbackJob ran after_perform" }
+ after_perform ->(job) { job.history << "CallbackJob ran after_perform" }
before_enqueue ->(job) { job.history << "CallbackJob ran before_enqueue" }
- after_enqueue ->(job) { job.history << "CallbackJob ran after_enqueue" }
+ after_enqueue ->(job) { job.history << "CallbackJob ran after_enqueue" }
- around_perform :around_perform
- around_enqueue :around_enqueue
+ around_perform do |job, block|
+ job.history << "CallbackJob ran around_perform_start"
+ block.call
+ job.history << "CallbackJob ran around_perform_stop"
+ end
+
+ around_enqueue do |job, block|
+ job.history << "CallbackJob ran around_enqueue_start"
+ block.call
+ job.history << "CallbackJob ran around_enqueue_stop"
+ end
def perform(person = "david")
@@ -17,16 +26,4 @@ class CallbackJob < ActiveJob::Base
@history ||= []
end
- # FIXME: Not sure why these can't be declared inline like before/after
- def around_perform
- history << "CallbackJob ran around_perform_start"
- yield
- history << "CallbackJob ran around_perform_stop"
- end
-
- def around_enqueue
- history << "CallbackJob ran around_enqueue_start"
- yield
- history << "CallbackJob ran around_enqueue_stop"
- end
end
diff --git a/activejob/test/jobs/nested_job.rb b/activejob/test/jobs/nested_job.rb
index fd66f68991..8c4ec549a6 100644
--- a/activejob/test/jobs/nested_job.rb
+++ b/activejob/test/jobs/nested_job.rb
@@ -1,6 +1,6 @@
class NestedJob < ActiveJob::Base
def perform
- LoggingJob.enqueue "NestedJob"
+ LoggingJob.perform_later "NestedJob"
end
def job_id
diff --git a/activejob/test/jobs/rescue_job.rb b/activejob/test/jobs/rescue_job.rb
index 6b6e74e9d0..f1b9c9349e 100644
--- a/activejob/test/jobs/rescue_job.rb
+++ b/activejob/test/jobs/rescue_job.rb
@@ -6,7 +6,7 @@ class RescueJob < ActiveJob::Base
rescue_from(ArgumentError) do
JobBuffer.add('rescued from ArgumentError')
arguments[0] = "DIFFERENT!"
- retry_now
+ retry_job
end
rescue_from(ActiveJob::DeserializationError) do |e|
diff --git a/activejob/test/support/delayed_job/delayed/backend/test.rb b/activejob/test/support/delayed_job/delayed/backend/test.rb
index b50ed36fc2..f80ec3a5a6 100644
--- a/activejob/test/support/delayed_job/delayed/backend/test.rb
+++ b/activejob/test/support/delayed_job/delayed/backend/test.rb
@@ -43,9 +43,7 @@ module Delayed
end
def self.create(attrs = {})
- new(attrs).tap do |o|
- o.save
- end
+ new(attrs).tap(&:save)
end
def self.create!(*args); create(*args); end
diff --git a/activejob/test/support/integration/adapters/backburner.rb b/activejob/test/support/integration/adapters/backburner.rb
new file mode 100644
index 0000000000..2e82562948
--- /dev/null
+++ b/activejob/test/support/integration/adapters/backburner.rb
@@ -0,0 +1,38 @@
+module BackburnerJobsManager
+ def setup
+ ActiveJob::Base.queue_adapter = :backburner
+ Backburner.configure do |config|
+ config.logger = Rails.logger
+ end
+ unless can_run?
+ puts "Cannot run integration tests for backburner. To be able to run integration tests for backburner you need to install and start beanstalkd.\n"
+ exit
+ end
+ end
+
+ def clear_jobs
+ tube.clear
+ end
+
+ def start_workers
+ @thread = Thread.new { Backburner.work "integration-tests" } # backburner dasherizes the queue name
+ end
+
+ def stop_workers
+ @thread.kill
+ end
+
+ def tube
+ @tube ||= Beaneater::Tube.new(Backburner::Worker.connection, "backburner.worker.queue.integration-tests") # backburner dasherizes the queue name
+ end
+
+ def can_run?
+ begin
+ Backburner::Worker.connection.send :connect!
+ rescue
+ return false
+ end
+ true
+ end
+end
+
diff --git a/activejob/test/support/integration/adapters/delayed_job.rb b/activejob/test/support/integration/adapters/delayed_job.rb
new file mode 100644
index 0000000000..0b591964bc
--- /dev/null
+++ b/activejob/test/support/integration/adapters/delayed_job.rb
@@ -0,0 +1,20 @@
+require 'delayed_job'
+require 'delayed_job_active_record'
+
+module DelayedJobJobsManager
+ def setup
+ ActiveJob::Base.queue_adapter = :delayed_job
+ end
+ def clear_jobs
+ Delayed::Job.delete_all
+ end
+
+ def start_workers
+ @worker = Delayed::Worker.new(quiet: true, sleep_delay: 0.5, queues: %w(integration_tests))
+ @thread = Thread.new { @worker.start }
+ end
+
+ def stop_workers
+ @worker.stop
+ end
+end
diff --git a/activejob/test/support/integration/adapters/inline.rb b/activejob/test/support/integration/adapters/inline.rb
new file mode 100644
index 0000000000..83c38f706f
--- /dev/null
+++ b/activejob/test/support/integration/adapters/inline.rb
@@ -0,0 +1,15 @@
+module InlineJobsManager
+ def setup
+ ActiveJob::Base.queue_adapter = :inline
+ end
+
+ def clear_jobs
+ end
+
+ def start_workers
+ end
+
+ def stop_workers
+ end
+end
+
diff --git a/activejob/test/support/integration/adapters/qu.rb b/activejob/test/support/integration/adapters/qu.rb
new file mode 100644
index 0000000000..3a5b66a057
--- /dev/null
+++ b/activejob/test/support/integration/adapters/qu.rb
@@ -0,0 +1,38 @@
+module QuJobsManager
+ def setup
+ require 'qu-rails'
+ require 'qu-redis'
+ ActiveJob::Base.queue_adapter = :qu
+ ENV['REDISTOGO_URL'] = "tcp://127.0.0.1:6379/12"
+ backend = Qu::Backend::Redis.new
+ backend.namespace = "active_jobs_int_test"
+ Qu.backend = backend
+ Qu.logger = Rails.logger
+ Qu.interval = 0.5
+ unless can_run?
+ puts "Cannot run integration tests for qu. To be able to run integration tests for qu you need to install and start redis.\n"
+ exit
+ end
+ end
+
+ def clear_jobs
+ Qu.clear "integration_tests"
+ end
+
+ def start_workers
+ @thread = Thread.new { Qu::Worker.new("integration_tests").start }
+ end
+
+ def stop_workers
+ @thread.kill
+ end
+
+ def can_run?
+ begin
+ Qu.backend.connection.client.connect
+ rescue
+ return false
+ end
+ true
+ end
+end
diff --git a/activejob/test/support/integration/adapters/que.rb b/activejob/test/support/integration/adapters/que.rb
new file mode 100644
index 0000000000..ba7657a42a
--- /dev/null
+++ b/activejob/test/support/integration/adapters/que.rb
@@ -0,0 +1,37 @@
+module QueJobsManager
+ def setup
+ require 'sequel'
+ ActiveJob::Base.queue_adapter = :que
+ que_url = ENV['QUE_DATABASE_URL'] || 'postgres:///active_jobs_que_int_test'
+ uri = URI.parse(que_url)
+ user = uri.user||ENV['USER']
+ pass = uri.password
+ db = uri.path[1..-1]
+ %x{#{"PGPASSWORD=\"#{pass}\"" if pass} psql -c 'drop database if exists "#{db}"' -U #{user} -t template1}
+ %x{#{"PGPASSWORD=\"#{pass}\"" if pass} psql -c 'create database "#{db}"' -U #{user} -t template1}
+ Que.connection = Sequel.connect(que_url)
+ Que.migrate!
+ Que.mode = :off
+ Que.worker_count = 1
+ rescue Sequel::DatabaseConnectionError
+ puts "Cannot run integration tests for que. To be able to run integration tests for que you need to install and start postgresql.\n"
+ exit
+ end
+
+ def clear_jobs
+ Que.clear!
+ end
+
+ def start_workers
+ @thread = Thread.new do
+ loop do
+ Que::Job.work("integration_tests")
+ sleep 0.5
+ end
+ end
+ end
+
+ def stop_workers
+ @thread.kill
+ end
+end
diff --git a/activejob/test/support/integration/adapters/queue_classic.rb b/activejob/test/support/integration/adapters/queue_classic.rb
new file mode 100644
index 0000000000..038473ccdc
--- /dev/null
+++ b/activejob/test/support/integration/adapters/queue_classic.rb
@@ -0,0 +1,33 @@
+module QueueClassicJobsManager
+ def setup
+ ENV['QC_DATABASE_URL'] ||= 'postgres:///active_jobs_qc_int_test'
+ ENV['QC_LISTEN_TIME'] = "0.5"
+ uri = URI.parse(ENV['QC_DATABASE_URL'])
+ user = uri.user||ENV['USER']
+ pass = uri.password
+ db = uri.path[1..-1]
+ %x{#{"PGPASSWORD=\"#{pass}\"" if pass} psql -c 'drop database if exists "#{db}"' -U #{user} -t template1}
+ %x{#{"PGPASSWORD=\"#{pass}\"" if pass} psql -c 'create database "#{db}"' -U #{user} -t template1}
+ ActiveJob::Base.queue_adapter = :queue_classic
+ QC::Setup.create
+ rescue PG::ConnectionBad
+ puts "Cannot run integration tests for queue_classic. To be able to run integration tests for queue_classic you need to install and start postgresql.\n"
+ exit
+ end
+
+ def clear_jobs
+ QC::Queue.new("integration_tests").delete_all
+ end
+
+ def start_workers
+ QC::Conn.disconnect
+ @pid = fork do
+ worker = QC::Worker.new(q_name: 'integration_tests')
+ worker.start
+ end
+ end
+
+ def stop_workers
+ Process.kill 'HUP', @pid
+ end
+end
diff --git a/activejob/test/support/integration/adapters/resque.rb b/activejob/test/support/integration/adapters/resque.rb
new file mode 100644
index 0000000000..912f4bc387
--- /dev/null
+++ b/activejob/test/support/integration/adapters/resque.rb
@@ -0,0 +1,49 @@
+module ResqueJobsManager
+ def setup
+ ActiveJob::Base.queue_adapter = :resque
+ Resque.redis = Redis::Namespace.new 'active_jobs_int_test', redis: Redis.connect(url: "redis://127.0.0.1:6379/12", :thread_safe => true)
+ Resque.logger = Rails.logger
+ unless can_run?
+ puts "Cannot run integration tests for resque. To be able to run integration tests for resque you need to install and start redis.\n"
+ exit
+ end
+ end
+
+ def clear_jobs
+ Resque.queues.each { |queue_name| Resque.redis.del "queue:#{queue_name}" }
+ Resque.redis.keys("delayed:*").each { |key| Resque.redis.del "#{key}" }
+ Resque.redis.del "delayed_queue_schedule"
+ end
+
+ def start_workers
+ @resque_thread = Thread.new do
+ w = Resque::Worker.new("integration_tests")
+ w.term_child = true
+ w.work(0.5)
+ end
+ @scheduler_thread = Thread.new do
+ Resque::Scheduler.configure do |c|
+ c.poll_sleep_amount = 0.5
+ c.dynamic = true
+ c.quiet = true
+ c.logfile = nil
+ end
+ Resque::Scheduler.master_lock.release!
+ Resque::Scheduler.run
+ end
+ end
+
+ def stop_workers
+ @resque_thread.kill
+ @scheduler_thread.kill
+ end
+
+ def can_run?
+ begin
+ Resque.redis.client.connect
+ rescue
+ return false
+ end
+ true
+ end
+end
diff --git a/activejob/test/support/integration/adapters/sidekiq.rb b/activejob/test/support/integration/adapters/sidekiq.rb
new file mode 100644
index 0000000000..6ff18fb56a
--- /dev/null
+++ b/activejob/test/support/integration/adapters/sidekiq.rb
@@ -0,0 +1,58 @@
+require 'sidekiq/cli'
+require 'sidekiq/api'
+
+module SidekiqJobsManager
+
+ def setup
+ ActiveJob::Base.queue_adapter = :sidekiq
+ unless can_run?
+ puts "Cannot run integration tests for sidekiq. To be able to run integration tests for sidekiq you need to install and start redis.\n"
+ exit
+ end
+ end
+
+ def clear_jobs
+ Sidekiq::ScheduledSet.new.clear
+ Sidekiq::Queue.new("integration_tests").clear
+ end
+
+ def start_workers
+ fork do
+ sidekiq = Sidekiq::CLI.instance
+ logfile = Rails.root.join("log/sidekiq.log").to_s
+ pidfile = Rails.root.join("tmp/sidekiq.pid").to_s
+ sidekiq.parse([ "--require", Rails.root.to_s,
+ "--queue", "integration_tests",
+ "--logfile", logfile,
+ "--pidfile", pidfile,
+ "--environment", "test",
+ "--concurrency", "1",
+ "--timeout", "1",
+ "--daemon",
+ ])
+ require 'celluloid'
+ require 'sidekiq/scheduled'
+ Sidekiq.poll_interval = 0.5
+ Sidekiq::Scheduled.const_set :INITIAL_WAIT, 1
+ sidekiq.run
+ end
+ sleep 1
+ end
+
+ def stop_workers
+ pidfile = Rails.root.join("tmp/sidekiq.pid").to_s
+ Process.kill 'TERM', File.open(pidfile).read.to_i
+ FileUtils.rm_f pidfile
+ rescue
+ end
+
+ def can_run?
+ begin
+ Sidekiq.redis(&:info)
+ Sidekiq.logger = nil
+ rescue
+ return false
+ end
+ true
+ end
+end
diff --git a/activejob/test/support/integration/adapters/sneakers.rb b/activejob/test/support/integration/adapters/sneakers.rb
new file mode 100644
index 0000000000..875803a2d8
--- /dev/null
+++ b/activejob/test/support/integration/adapters/sneakers.rb
@@ -0,0 +1,90 @@
+require 'sneakers/runner'
+require 'sneakers/publisher'
+require 'timeout'
+
+module Sneakers
+ class Publisher
+ def safe_ensure_connected
+ @mutex.synchronize do
+ ensure_connection! unless connected?
+ end
+ end
+ end
+end
+
+
+module SneakersJobsManager
+ def setup
+ ActiveJob::Base.queue_adapter = :sneakers
+ Sneakers.configure :heartbeat => 2,
+ :amqp => 'amqp://guest:guest@localhost:5672',
+ :vhost => '/',
+ :exchange => 'active_jobs_sneakers_int_test',
+ :exchange_type => :direct,
+ :daemonize => true,
+ :threads => 1,
+ :workers => 1,
+ :pid_path => Rails.root.join("tmp/sneakers.pid").to_s,
+ :log => Rails.root.join("log/sneakers.log").to_s
+ unless can_run?
+ puts "Cannot run integration tests for sneakers. To be able to run integration tests for sneakers you need to install and start rabbitmq.\n"
+ exit
+ end
+ end
+
+ def clear_jobs
+ bunny_queue.purge
+ end
+
+ def start_workers
+ @pid = fork do
+ queues = %w(integration_tests)
+ workers = queues.map do |q|
+ worker_klass = "ActiveJobWorker"+Digest::MD5.hexdigest(q)
+ Sneakers.const_set(worker_klass, Class.new(ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper) do
+ from_queue q
+ end)
+ end
+ Sneakers::Runner.new(workers).run
+ end
+ begin
+ Timeout.timeout(10) do
+ while bunny_queue.status[:consumer_count] == 0
+ sleep 0.5
+ end
+ end
+ rescue Timeout::Error
+ stop_workers
+ raise "Failed to start sneakers worker"
+ end
+ end
+
+ def stop_workers
+ Process.kill 'TERM', @pid
+ Process.kill 'TERM', File.open(Rails.root.join("tmp/sneakers.pid").to_s).read.to_i
+ rescue
+ end
+
+ def can_run?
+ begin
+ bunny_publisher
+ rescue
+ return false
+ end
+ true
+ end
+
+ protected
+ def bunny_publisher
+ @bunny_publisher ||= begin
+ p = ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper.send(:publisher)
+ p.safe_ensure_connected
+ p
+ end
+ end
+
+ def bunny_queue
+ @queue ||= bunny_publisher.exchange.channel.queue "integration_tests", durable: true
+ end
+
+end
diff --git a/activejob/test/support/integration/adapters/sucker_punch.rb b/activejob/test/support/integration/adapters/sucker_punch.rb
new file mode 100644
index 0000000000..9c0d66b469
--- /dev/null
+++ b/activejob/test/support/integration/adapters/sucker_punch.rb
@@ -0,0 +1,6 @@
+module SuckerPunchJobsManager
+ def setup
+ ActiveJob::Base.queue_adapter = :sucker_punch
+ SuckerPunch.logger = nil
+ end
+end
diff --git a/activejob/test/support/integration/dummy_app_template.rb b/activejob/test/support/integration/dummy_app_template.rb
new file mode 100644
index 0000000000..65994d6a1c
--- /dev/null
+++ b/activejob/test/support/integration/dummy_app_template.rb
@@ -0,0 +1,21 @@
+if ENV['AJADAPTER'] == 'delayed_job'
+ generate "delayed_job:active_record", "--quiet"
+ rake("db:migrate")
+end
+
+initializer 'activejob.rb', <<-CODE
+require "#{File.expand_path("../jobs_manager.rb", __FILE__)}"
+JobsManager.current_manager.setup
+CODE
+
+file 'app/jobs/test_job.rb', <<-CODE
+class TestJob < ActiveJob::Base
+ queue_as :integration_tests
+
+ def perform(x)
+ File.open(Rails.root.join("tmp/\#{x}"), "w+") do |f|
+ f.write x
+ end
+ end
+end
+CODE
diff --git a/activejob/test/support/integration/helper.rb b/activejob/test/support/integration/helper.rb
new file mode 100644
index 0000000000..9bd45e09e8
--- /dev/null
+++ b/activejob/test/support/integration/helper.rb
@@ -0,0 +1,30 @@
+puts "*** rake aj:integration:#{ENV['AJADAPTER']} ***\n"
+
+ENV["RAILS_ENV"] = "test"
+ActiveJob::Base.queue_name_prefix = nil
+
+require 'rails/generators/rails/app/app_generator'
+
+dummy_app_path = Dir.mktmpdir + "/dummy"
+dummy_app_template = File.expand_path("../dummy_app_template.rb", __FILE__)
+args = Rails::Generators::ARGVScrubber.new(["new", dummy_app_path, "--skip-gemfile", "--skip-bundle",
+ "--skip-git", "--skip-spring", "-d", "sqlite3", "--skip-javascript", "--force", "--quiet",
+ "--template", dummy_app_template]).prepare!
+Rails::Generators::AppGenerator.start args
+
+require "#{dummy_app_path}/config/environment.rb"
+
+ActiveRecord::Migrator.migrations_paths = [ Rails.root.join('db/migrate').to_s ]
+require 'rails/test_help'
+
+Rails.backtrace_cleaner.remove_silencers!
+
+require_relative 'test_case_helpers'
+ActiveSupport::TestCase.send(:include, TestCaseHelpers)
+
+JobsManager.current_manager.start_workers
+
+Minitest.after_run do
+ JobsManager.current_manager.stop_workers
+ JobsManager.current_manager.clear_jobs
+end
diff --git a/activejob/test/support/integration/jobs_manager.rb b/activejob/test/support/integration/jobs_manager.rb
new file mode 100644
index 0000000000..4df34aaeb1
--- /dev/null
+++ b/activejob/test/support/integration/jobs_manager.rb
@@ -0,0 +1,27 @@
+class JobsManager
+ @@managers = {}
+ attr :adapter_name
+
+ def self.current_manager
+ @@managers[ENV['AJADAPTER']] ||= new(ENV['AJADAPTER'])
+ end
+
+ def initialize(adapter_name)
+ @adapter_name = adapter_name
+ require_relative "adapters/#{adapter_name}"
+ extend "#{adapter_name.camelize}JobsManager".constantize
+ end
+
+ def setup
+ ActiveJob::Base.queue_adapter = nil
+ end
+
+ def clear_jobs
+ end
+
+ def start_workers
+ end
+
+ def stop_workers
+ end
+end
diff --git a/activejob/test/support/integration/test_case_helpers.rb b/activejob/test/support/integration/test_case_helpers.rb
new file mode 100644
index 0000000000..ee2f6aebea
--- /dev/null
+++ b/activejob/test/support/integration/test_case_helpers.rb
@@ -0,0 +1,48 @@
+require 'active_support/concern'
+require 'support/integration/jobs_manager'
+
+module TestCaseHelpers
+ extend ActiveSupport::Concern
+
+ included do
+ self.use_transactional_fixtures = false
+
+ setup do
+ clear_jobs
+ @id = "AJ-#{SecureRandom.uuid}"
+ end
+
+ teardown do
+ clear_jobs
+ end
+ end
+
+ protected
+
+ def jobs_manager
+ JobsManager.current_manager
+ end
+
+ def clear_jobs
+ jobs_manager.clear_jobs
+ end
+
+ def adapter_is?(adapter)
+ ActiveJob::Base.queue_adapter.name.split("::").last.gsub(/Adapter$/, '').underscore==adapter.to_s
+ end
+
+ def wait_for_jobs_to_finish_for(seconds=60)
+ begin
+ Timeout.timeout(seconds) do
+ while !job_executed do
+ sleep 0.25
+ end
+ end
+ rescue Timeout::Error
+ end
+ end
+
+ def job_executed
+ Dummy::Application.root.join("tmp/#{@id}").exist?
+ end
+end
diff --git a/activejob/test/support/queue_classic/inline.rb b/activejob/test/support/queue_classic/inline.rb
index 5e9c295e01..5743d5bbb5 100644
--- a/activejob/test/support/queue_classic/inline.rb
+++ b/activejob/test/support/queue_classic/inline.rb
@@ -7,5 +7,17 @@ module QC
receiver = eval(receiver_str)
receiver.send(message, *args)
end
+
+ def enqueue_in(seconds, method, *args)
+ receiver_str, _, message = method.rpartition('.')
+ receiver = eval(receiver_str)
+ receiver.send(message, *args)
+ end
+
+ def enqueue_at(not_before, method, *args)
+ receiver_str, _, message = method.rpartition('.')
+ receiver = eval(receiver_str)
+ receiver.send(message, *args)
+ end
end
end