diff options
Diffstat (limited to 'activejob')
66 files changed, 2074 insertions, 429 deletions
diff --git a/activejob/.gitignore b/activejob/.gitignore new file mode 100644 index 0000000000..b3aaf55871 --- /dev/null +++ b/activejob/.gitignore @@ -0,0 +1 @@ +test/dummy diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md index b04883413e..afdd42be33 100644 --- a/activejob/CHANGELOG.md +++ b/activejob/CHANGELOG.md @@ -1 +1,27 @@ -* Started project.
\ No newline at end of file +* `ActiveJob::Base.deserialize` delegates to the job class + + + Since `ActiveJob::Base#deserialize` can be overridden by subclasses (like + `ActiveJob::Base#serialize`) this allows jobs to attach arbitrary metadata + when they get serialized and read it back when they get performed. Example: + + class DeliverWebhookJob < ActiveJob::Base + def serialize + super.merge('attempt_number' => (@attempt_number || 0) + 1) + end + + def deserialize(job_data) + super + @attempt_number = job_data['attempt_number'] + end + + rescue_from(TimeoutError) do |exception| + raise exception if @attempt_number > 5 + retry_job(wait: 10) + end + end + + *Isaac Seymour* + + +Please check [4-2-stable](https://github.com/rails/rails/blob/4-2-stable/activejob/CHANGELOG.md) for previous changes. diff --git a/activejob/MIT-LICENSE b/activejob/MIT-LICENSE index 8b1e97b776..0cef8cdda0 100644 --- a/activejob/MIT-LICENSE +++ b/activejob/MIT-LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2014 David Heinemeier Hansson +Copyright (c) 2014-2015 David Heinemeier Hansson Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/activejob/README.md b/activejob/README.md index 1f300fcf62..8c83d3669a 100644 --- a/activejob/README.md +++ b/activejob/README.md @@ -5,7 +5,7 @@ of queueing backends. These jobs can be everything from regularly scheduled clean-ups, to billing charges, to mailings. Anything that can be chopped up into small units of work and run in parallel, really. -It also serves as the backend for ActionMailer's #deliver_later functionality +It also serves as the backend for Action Mailer's #deliver_later functionality that makes it easy to turn any mailing into a job for running later. That's one of the most common jobs in a modern web application: Sending emails outside of the request-response cycle, so the user doesn't have to wait on it. @@ -24,9 +24,10 @@ Set the queue adapter for Active Job: ``` ruby ActiveJob::Base.queue_adapter = :inline # default queue adapter -# Adapters currently supported: :backburner, :delayed_job, :qu, :que, :queue_classic, -# :resque, :sidekiq, :sneakers, :sucker_punch ``` +Note: To learn how to use your preferred queueing backend see its adapter +documentation at +[ActiveJob::QueueAdapters](http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html). Declare a job like so: @@ -43,15 +44,15 @@ end Enqueue a job like so: ```ruby -MyJob.enqueue record # Enqueue a job to be performed as soon the queueing system is free. +MyJob.perform_later record # Enqueue a job to be performed as soon the queueing system is free. ``` ```ruby -MyJob.enqueue_at Date.tomorrow.noon, record # Enqueue a job to be performed tomorrow at noon. +MyJob.set(wait_until: Date.tomorrow.noon).perform_later(record) # Enqueue a job to be performed tomorrow at noon. ``` ```ruby -MyJob.enqueue_in 1.week, record # Enqueue a job to be performed 1 week from now. +MyJob.set(wait: 1.week).perform_later(record) # Enqueue a job to be performed 1 week from now. ``` That's it! @@ -88,18 +89,9 @@ by default has been mixed into Active Record classes. ## Supported queueing systems -We currently have adapters for: - -* [Backburner](https://github.com/nesquena/backburner) -* [Delayed Job](https://github.com/collectiveidea/delayed_job) -* [Qu](https://github.com/bkeepers/qu) -* [Que](https://github.com/chanks/que) -* [QueueClassic](https://github.com/ryandotsmith/queue_classic) -* [Resque 1.x](https://github.com/resque/resque) -* [Sidekiq](https://github.com/mperham/sidekiq) -* [Sneakers](https://github.com/jondot/sneakers) -* [Sucker Punch](https://github.com/brandonhilkert/sucker_punch) - +Active Job has built-in adapters for multiple queueing backends (Sidekiq, +Resque, Delayed Job and others). To get an up-to-date list of the adapters +see the API Documentation for [ActiveJob::QueueAdapters](http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html). ## Auxiliary gems @@ -119,7 +111,7 @@ Source code can be downloaded as part of the Rails project on GitHub ## License -ActiveJob is released under the MIT license: +Active Job is released under the MIT license: * http://www.opensource.org/licenses/MIT @@ -137,5 +129,3 @@ Bug reports can be filed for the Ruby on Rails project here: Feature requests should be discussed on the rails-core mailing list here: * https://groups.google.com/forum/?fromgroups#!forum/rubyonrails-core - - diff --git a/activejob/Rakefile b/activejob/Rakefile index 484cd1d0b8..1922f256ec 100644 --- a/activejob/Rakefile +++ b/activejob/Rakefile @@ -1,73 +1,78 @@ require 'rake/testtask' require 'rubygems/package_task' -dir = File.dirname(__FILE__) - -def run_without_aborting(*tasks) - errors = [] - - tasks.each do |task| - begin - Rake::Task[task].invoke - rescue Exception - errors << task - end - end - - abort "Errors running #{errors.join(', ')}" if errors.any? -end - -task default: :test - -ACTIVEJOB_ADAPTERS = %w(inline delayed_job qu que queue_classic resque sidekiq sneakers sucker_punch backburner) +ACTIVEJOB_ADAPTERS = %w(inline delayed_job qu que queue_classic resque sidekiq sneakers sucker_punch backburner test) ACTIVEJOB_ADAPTERS -= %w(queue_classic) if defined?(JRUBY_VERSION) -desc 'Run all adapter tests' -task :test do - tasks = ACTIVEJOB_ADAPTERS.map{|a| "test_#{a}" } - run_without_aborting(*tasks) -end +task default: :test +task test: 'test:default' namespace :test do + desc 'Run all adapter tests' + task :default do + run_without_aborting ACTIVEJOB_ADAPTERS.map { |a| "test:#{a}" } + end + desc 'Run all adapter tests in isolation' task :isolated do - tasks = ACTIVEJOB_ADAPTERS.map{|a| "isolated_test_#{a}" } - run_without_aborting(*tasks) + run_without_aborting ACTIVEJOB_ADAPTERS.map { |a| "test:isolated:#{a}" } + end + + desc 'Run integration tests for all adapters' + task :integration do + run_without_aborting (ACTIVEJOB_ADAPTERS - ['test']).map { |a| "test:integration:#{a}" } + end + + task 'env:integration' do + ENV['AJ_INTEGRATION_TESTS'] = "1" end -end + ACTIVEJOB_ADAPTERS.each do |adapter| + task("env:#{adapter}") { ENV['AJADAPTER'] = adapter } -ACTIVEJOB_ADAPTERS.each do |adapter| - namespace :test do - Rake::TestTask.new(adapter => "#{adapter}:env") do |t| - t.description = "" + Rake::TestTask.new(adapter => "test:env:#{adapter}") do |t| + t.description = "Run adapter tests for #{adapter}" t.libs << 'test' t.test_files = FileList['test/cases/**/*_test.rb'] t.verbose = true + t.warning = true + t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION) end namespace :isolated do - task adapter => "#{adapter}:env" do + task adapter => "test:env:#{adapter}" do + dir = File.dirname(__FILE__) Dir.glob("#{dir}/test/cases/**/*_test.rb").all? do |file| sh(Gem.ruby, '-w', "-I#{dir}/lib", "-I#{dir}/test", file) end or raise 'Failures' end end - end - - namespace adapter do - task test: "test_#{adapter}" - task isolated_test: "isolated_test_#{adapter}" - task(:env) { ENV['AJADAPTER'] = adapter } + namespace :integration do + Rake::TestTask.new(adapter => ["test:env:#{adapter}", 'test:env:integration']) do |t| + t.description = "Run integration tests for #{adapter}" + t.libs << 'test' + t.test_files = FileList['test/integration/**/*_test.rb'] + t.verbose = true + t.warning = true + t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION) + end + end end +end +def run_without_aborting(tasks) + errors = [] - desc "Run #{adapter} tests" - task "test_#{adapter}" => ["#{adapter}:env", "test:#{adapter}"] + tasks.each do |task| + begin + Rake::Task[task].invoke + rescue Exception + errors << task + end + end - desc "Run #{adapter} tests in isolation" - task "isolated_test_#{adapter}" => ["#{adapter}:env", "test:isolated:#{adapter}"] + abort "Errors running #{errors.join(', ')}" if errors.any? end diff --git a/activejob/activejob.gemspec b/activejob/activejob.gemspec index 9944b2a1bd..5404ece804 100644 --- a/activejob/activejob.gemspec +++ b/activejob/activejob.gemspec @@ -7,7 +7,7 @@ Gem::Specification.new do |s| s.summary = 'Job framework with pluggable queues.' s.description = 'Declare job classes that can be run by a variety of queueing backends.' - s.required_ruby_version = '>= 1.9.3' + s.required_ruby_version = '>= 2.2.0' s.license = 'MIT' @@ -19,5 +19,5 @@ Gem::Specification.new do |s| s.require_path = 'lib' s.add_dependency 'activesupport', version - s.add_dependency 'globalid', '>= 0.2.3' + s.add_dependency 'globalid', '>= 0.3.0' end diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb index 29123170b8..3d4f63b261 100644 --- a/activejob/lib/active_job.rb +++ b/activejob/lib/active_job.rb @@ -1,5 +1,5 @@ #-- -# Copyright (c) 2014 David Heinemeier Hansson +# Copyright (c) 2014-2015 David Heinemeier Hansson # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the @@ -31,4 +31,7 @@ module ActiveJob autoload :Base autoload :QueueAdapters + autoload :ConfiguredJob + autoload :TestCase + autoload :TestHelper end diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb index e54f4afbc7..752be6898e 100644 --- a/activejob/lib/active_job/arguments.rb +++ b/activejob/lib/active_job/arguments.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module ActiveJob # Raised when an exception is raised during job arguments deserialization. # @@ -5,8 +7,8 @@ module ActiveJob class DeserializationError < StandardError attr_reader :original_exception - def initialize(e) - super ("Error while trying to deserialize arguments: #{e.message}") + def initialize(e) #:nodoc: + super("Error while trying to deserialize arguments: #{e.message}") @original_exception = e set_backtrace e.backtrace end @@ -24,25 +26,38 @@ module ActiveJob extend self TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ] + # Serializes a set of arguments. Whitelisted types are returned + # as-is. Arrays/Hashes are serialized element by element. + # All other types are serialized using GlobalID. def serialize(arguments) arguments.map { |argument| serialize_argument(argument) } end + # Deserializes a set of arguments. Whitelisted types are returned + # as-is. Arrays/Hashes are deserialized element by element. + # All other types are deserialized using GlobalID. def deserialize(arguments) arguments.map { |argument| deserialize_argument(argument) } + rescue => e + raise DeserializationError.new(e) end private + GLOBALID_KEY = '_aj_globalid'.freeze + private_constant :GLOBALID_KEY + def serialize_argument(argument) case argument - when GlobalID::Identification - argument.global_id.to_s when *TYPE_WHITELIST argument + when GlobalID::Identification + { GLOBALID_KEY => argument.to_global_id.to_s } when Array - serialize(argument) + argument.map { |arg| serialize_argument(arg) } when Hash - Hash[ argument.map { |key, value| [ serialize_hash_key(key), serialize_argument(value) ] } ] + argument.each_with_object({}) do |(key, value), hash| + hash[serialize_hash_key(key)] = serialize_argument(value) + end else raise SerializationError.new("Unsupported argument type: #{argument.class.name}") end @@ -50,23 +65,48 @@ module ActiveJob def deserialize_argument(argument) case argument + when String + GlobalID::Locator.locate(argument) || argument + when *TYPE_WHITELIST + argument when Array - deserialize(argument) + argument.map { |arg| deserialize_argument(arg) } when Hash - Hash[ argument.map { |key, value| [ key, deserialize_argument(value) ] } ].with_indifferent_access + if serialized_global_id?(argument) + deserialize_global_id argument + else + deserialize_hash argument + end else - GlobalID::Locator.locate(argument) || argument + raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}" end - rescue => e - raise DeserializationError.new(e) end + def serialized_global_id?(hash) + hash.size == 1 and hash.include?(GLOBALID_KEY) + end + + def deserialize_global_id(hash) + GlobalID::Locator.locate hash[GLOBALID_KEY] + end + + def deserialize_hash(serialized_hash) + serialized_hash.each_with_object({}.with_indifferent_access) do |(key, value), hash| + hash[key] = deserialize_argument(value) + end + end + + RESERVED_KEYS = [GLOBALID_KEY, GLOBALID_KEY.to_sym] + private_constant :RESERVED_KEYS + def serialize_hash_key(key) case key + when *RESERVED_KEYS + raise SerializationError.new("Can't serialize a Hash with reserved key #{key.inspect}") when String, Symbol key.to_s else - raise SerializationError.new("Unsupported hash key type: #{key.class.name}") + raise SerializationError.new("Only string and symbol hash keys may be serialized as job arguments, but #{key.inspect} is a #{key.class}") end end end diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb index d5ba253016..fd49b3fda5 100644 --- a/activejob/lib/active_job/base.rb +++ b/activejob/lib/active_job/base.rb @@ -1,20 +1,64 @@ +require 'active_job/core' require 'active_job/queue_adapter' require 'active_job/queue_name' require 'active_job/enqueuing' require 'active_job/execution' require 'active_job/callbacks' -require 'active_job/identifier' require 'active_job/logging' -module ActiveJob +module ActiveJob #:nodoc: + # = Active Job + # + # Active Job objects can be configured to work with different backend + # queuing frameworks. To specify a queue adapter to use: + # + # ActiveJob::Base.queue_adapter = :inline + # + # A list of supported adapters can be found in QueueAdapters. + # + # Active Job objects can be defined by creating a class that inherits + # from the ActiveJob::Base class. The only necessary method to + # implement is the "perform" method. + # + # To define an Active Job object: + # + # class ProcessPhotoJob < ActiveJob::Base + # def perform(photo) + # photo.watermark!('Rails') + # photo.rotate!(90.degrees) + # photo.resize_to_fit!(300, 300) + # photo.upload! + # end + # end + # + # Records that are passed in are serialized/deserialized using Global + # ID. More information can be found in Arguments. + # + # To enqueue a job to be performed as soon the queueing system is free: + # + # ProcessPhotoJob.perform_later(photo) + # + # To enqueue a job to be processed at some point in the future: + # + # ProcessPhotoJob.set(wait_until: Date.tomorrow.noon).perform_later(photo) + # + # More information can be found in ActiveJob::Core::ClassMethods#set + # + # A job can also be processed immediately without sending to the queue: + # + # ProcessPhotoJob.perform_now(photo) + # + # == Exceptions + # + # * DeserializationError - Error class for deserialization errors. + # * SerializationError - Error class for serialization errors. class Base - extend QueueAdapter - + include Core + include QueueAdapter include QueueName include Enqueuing include Execution include Callbacks - include Identifier include Logging ActiveSupport.run_load_hooks(:active_job, self) diff --git a/activejob/lib/active_job/callbacks.rb b/activejob/lib/active_job/callbacks.rb index cafa3438c0..2b6149e84e 100644 --- a/activejob/lib/active_job/callbacks.rb +++ b/activejob/lib/active_job/callbacks.rb @@ -3,8 +3,8 @@ require 'active_support/callbacks' module ActiveJob # = Active Job Callbacks # - # Active Job provides hooks during the lifecycle of a job. Callbacks allow you - # to trigger logic during the lifecycle of a job. Available callbacks are: + # Active Job provides hooks during the life cycle of a job. Callbacks allow you + # to trigger logic during the life cycle of a job. Available callbacks are: # # * <tt>before_enqueue</tt> # * <tt>around_enqueue</tt> @@ -22,6 +22,8 @@ module ActiveJob define_callbacks :enqueue end + # These methods will be included into any Active Job object, adding + # callbacks for +perform+ and +enqueue+ methods. module ClassMethods # Defines a callback that will get called right before the # job's perform method is executed. @@ -36,7 +38,7 @@ module ActiveJob # def perform(video_id) # Video.find(video_id).process # end - # end + # end # def before_perform(*filters, &blk) set_callback(:perform, :before, *filters, &blk) @@ -55,7 +57,7 @@ module ActiveJob # def perform(video_id) # Video.find(video_id).process # end - # end + # end # def after_perform(*filters, &blk) set_callback(:perform, :after, *filters, &blk) @@ -75,7 +77,7 @@ module ActiveJob # def perform(video_id) # Video.find(video_id).process # end - # end + # end # def around_perform(*filters, &blk) set_callback(:perform, :around, *filters, &blk) @@ -94,7 +96,7 @@ module ActiveJob # def perform(video_id) # Video.find(video_id).process # end - # end + # end # def before_enqueue(*filters, &blk) set_callback(:enqueue, :before, *filters, &blk) @@ -113,7 +115,7 @@ module ActiveJob # def perform(video_id) # Video.find(video_id).process # end - # end + # end # def after_enqueue(*filters, &blk) set_callback(:enqueue, :after, *filters, &blk) @@ -134,7 +136,7 @@ module ActiveJob # def perform(video_id) # Video.find(video_id).process # end - # end + # end # def around_enqueue(*filters, &blk) set_callback(:enqueue, :around, *filters, &blk) diff --git a/activejob/lib/active_job/configured_job.rb b/activejob/lib/active_job/configured_job.rb new file mode 100644 index 0000000000..979280b910 --- /dev/null +++ b/activejob/lib/active_job/configured_job.rb @@ -0,0 +1,16 @@ +module ActiveJob + class ConfiguredJob #:nodoc: + def initialize(job_class, options={}) + @options = options + @job_class = job_class + end + + def perform_now(*args) + @job_class.new(*args).perform_now + end + + def perform_later(*args) + @job_class.new(*args).enqueue @options + end + end +end diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb new file mode 100644 index 0000000000..ddd7d1361c --- /dev/null +++ b/activejob/lib/active_job/core.rb @@ -0,0 +1,112 @@ +module ActiveJob + module Core + extend ActiveSupport::Concern + + included do + # Job arguments + attr_accessor :arguments + attr_writer :serialized_arguments + + # Timestamp when the job should be performed + attr_accessor :scheduled_at + + # Job Identifier + attr_accessor :job_id + + # Queue in which the job will reside. + attr_writer :queue_name + end + + # These methods will be included into any Active Job object, adding + # helpers for de/serialization and creation of job instances. + module ClassMethods + # Creates a new job instance from a hash created with +serialize+ + def deserialize(job_data) + job = job_data['job_class'].constantize.new + job.deserialize(job_data) + job + end + + # Creates a job preconfigured with the given options. You can call + # perform_later with the job arguments to enqueue the job with the + # preconfigured options + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # VideoJob.set(queue: :some_queue).perform_later(Video.last) + # VideoJob.set(wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last) + def set(options={}) + ConfiguredJob.new(self, options) + end + end + + # Creates a new job instance. Takes the arguments that will be + # passed to the perform method. + def initialize(*arguments) + @arguments = arguments + @job_id = SecureRandom.uuid + @queue_name = self.class.queue_name + end + + # Returns a hash with the job data that can safely be passed to the + # queueing adapter. + def serialize + { + 'job_class' => self.class.name, + 'job_id' => job_id, + 'queue_name' => queue_name, + 'arguments' => serialize_arguments(arguments) + } + end + + # Attaches the stored job data to the current instance. Receives a hash + # returned from +serialize+ + # + # ==== Examples + # + # class DeliverWebhookJob < ActiveJob::Base + # def serialize + # super.merge('attempt_number' => (@attempt_number || 0) + 1) + # end + # + # def deserialize(job_data) + # super + # @attempt_number = job_data['attempt_number'] + # end + # + # rescue_from(TimeoutError) do |exception| + # raise exception if @attempt_number > 5 + # retry_job(wait: 10) + # end + # end + def deserialize(job_data) + self.job_id = job_data['job_id'] + self.queue_name = job_data['queue_name'] + self.serialized_arguments = job_data['arguments'] + end + + private + def deserialize_arguments_if_needed + if defined?(@serialized_arguments) && @serialized_arguments.present? + @arguments = deserialize_arguments(@serialized_arguments) + @serialized_arguments = nil + end + end + + def serialize_arguments(serialized_args) + Arguments.serialize(serialized_args) + end + + def deserialize_arguments(serialized_args) + Arguments.deserialize(serialized_args) + end + end +end diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 3d00d51867..430c17e1bf 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -4,68 +4,74 @@ module ActiveJob module Enqueuing extend ActiveSupport::Concern + # Includes the +perform_later+ method for job initialization. module ClassMethods - # Push a job onto the queue. The arguments must be legal JSON types + # Push a job onto the queue. The arguments must be legal JSON types # (string, int, float, nil, true, false, hash or array) or - # GlobalID::Identification instances. Arbitrary Ruby objects + # GlobalID::Identification instances. Arbitrary Ruby objects # are not supported. # - # Returns an instance of the job class queued with args available in + # Returns an instance of the job class queued with arguments available in # Job#arguments. - def enqueue(*args) - new(args).tap do |job| - job.run_callbacks :enqueue do - queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args) - end - end - end - - # Enqueue a job to be performed at +interval+ from now. - # - # enqueue_in(1.week, "mike") - # - # Returns an instance of the job class queued with args available in - # Job#arguments and the timestamp in Job#enqueue_at. - def enqueue_in(interval, *args) - enqueue_at interval.seconds.from_now, *args + def perform_later(*args) + job_or_instantiate(*args).enqueue end - # Enqueue a job to be performed at an explicit point in time. - # - # enqueue_at(Date.tomorrow.midnight, "mike") - # - # Returns an instance of the job class queued with args available in - # Job#arguments and the timestamp in Job#enqueue_at. - def enqueue_at(timestamp, *args) - new(args).tap do |job| - job.enqueued_at = timestamp - - job.run_callbacks :enqueue do - queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args) - end + protected + def job_or_instantiate(*args) + args.first.is_a?(self) ? args.first : new(*args) end - end end - included do - attr_accessor :arguments - attr_accessor :enqueued_at + # Reschedules the job to be re-executed. This is useful in combination + # with the +rescue_from+ option. When you rescue an exception from your job + # you can ask Active Job to retry performing your job. + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # class SiteScrapperJob < ActiveJob::Base + # rescue_from(ErrorLoadingSite) do + # retry_job queue: :low_priority + # end + # + # def perform(*args) + # # raise ErrorLoadingSite if cannot scrape + # end + # end + def retry_job(options={}) + enqueue options end - def initialize(arguments = nil) - @arguments = arguments - end - - def retry_now - self.class.enqueue(*arguments) - end - - def retry_in(interval) - self.class.enqueue_in interval, *arguments - end - - def retry_at(timestamp) - self.class.enqueue_at timestamp, *arguments + # Enqueues the job to be performed by the queue adapter. + # + # ==== Options + # * <tt>:wait</tt> - Enqueues the job with the specified delay + # * <tt>:wait_until</tt> - Enqueues the job at the time specified + # * <tt>:queue</tt> - Enqueues the job on the specified queue + # + # ==== Examples + # + # my_job_instance.enqueue + # my_job_instance.enqueue wait: 5.minutes + # my_job_instance.enqueue queue: :important + # my_job_instance.enqueue wait_until: Date.tomorrow.midnight + def enqueue(options={}) + self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait] + self.scheduled_at = options[:wait_until].to_f if options[:wait_until] + self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] + run_callbacks :enqueue do + if self.scheduled_at + self.class.queue_adapter.enqueue_at self, self.scheduled_at + else + self.class.queue_adapter.enqueue self + end + end + self end end end diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb index 0e7b5bdd72..79d232da4a 100644 --- a/activejob/lib/active_job/execution.rb +++ b/activejob/lib/active_job/execution.rb @@ -4,15 +4,30 @@ require 'active_job/arguments' module ActiveJob module Execution extend ActiveSupport::Concern + include ActiveSupport::Rescuable - included do - include ActiveSupport::Rescuable - end + # Includes methods for executing and performing jobs instantly. + module ClassMethods + # Performs the job immediately. + # + # MyJob.perform_now("mike") + # + def perform_now(*args) + job_or_instantiate(*args).perform_now + end - def execute(job_id, *serialized_args) - self.job_id = job_id - self.arguments = deserialize_arguments(serialized_args) + def execute(job_data) #:nodoc: + job = deserialize(job_data) + job.perform_now + end + end + # Performs the job immediately. The job is not sent to the queueing adapter + # but directly executed by blocking the execution of others until it's finished. + # + # MyJob.new(*args).perform_now + def perform_now + deserialize_arguments_if_needed run_callbacks :perform do perform(*arguments) end @@ -23,11 +38,5 @@ module ActiveJob def perform(*) fail NotImplementedError end - - private - def deserialize_arguments(serialized_args) - Arguments.deserialize(serialized_args) - end - end end diff --git a/activejob/lib/active_job/gem_version.rb b/activejob/lib/active_job/gem_version.rb index c166020b28..27a5de93f4 100644 --- a/activejob/lib/active_job/gem_version.rb +++ b/activejob/lib/active_job/gem_version.rb @@ -1,12 +1,12 @@ module ActiveJob - # Returns the version of the currently loaded ActiveJob as a <tt>Gem::Version</tt> + # Returns the version of the currently loaded Active Job as a <tt>Gem::Version</tt> def self.gem_version Gem::Version.new VERSION::STRING end module VERSION - MAJOR = 4 - MINOR = 2 + MAJOR = 5 + MINOR = 0 TINY = 0 PRE = "alpha" diff --git a/activejob/lib/active_job/identifier.rb b/activejob/lib/active_job/identifier.rb deleted file mode 100644 index c7f522087d..0000000000 --- a/activejob/lib/active_job/identifier.rb +++ /dev/null @@ -1,15 +0,0 @@ -require 'active_job/arguments' - -module ActiveJob - module Identifier - extend ActiveSupport::Concern - - included do - attr_writer :job_id - end - - def job_id - @job_id ||= SecureRandom.uuid - end - end -end diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb index ae098a80f3..cd29e6908e 100644 --- a/activejob/lib/active_job/logging.rb +++ b/activejob/lib/active_job/logging.rb @@ -3,7 +3,7 @@ require 'active_support/tagged_logging' require 'active_support/logger' module ActiveJob - module Logging + module Logging #:nodoc: extend ActiveSupport::Concern included do @@ -17,7 +17,7 @@ module ActiveJob around_perform do |job, block, _| tag_logger(job.class.name, job.job_id) do - payload = {adapter: job.class.queue_adapter, job: job.class, args: job.arguments} + payload = {adapter: job.class.queue_adapter, job: job} ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup) ActiveSupport::Notifications.instrument("perform.active_job", payload) do block.call @@ -26,12 +26,12 @@ module ActiveJob end before_enqueue do |job| - if job.enqueued_at + if job.scheduled_at ActiveSupport::Notifications.instrument "enqueue_at.active_job", - adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at + adapter: job.class.queue_adapter, job: job else ActiveSupport::Notifications.instrument "enqueue.active_job", - adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments + adapter: job.class.queue_adapter, job: job end end end @@ -50,21 +50,33 @@ module ActiveJob logger.formatter.current_tags.include?("ActiveJob") end - class LogSubscriber < ActiveSupport::LogSubscriber + class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc: def enqueue(event) - info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event) } + info do + job = event.payload[:job] + "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job) + end end def enqueue_at(event) - info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) } + info do + job = event.payload[:job] + "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job) + end end def perform_start(event) - info { "Performing #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) } + info do + job = event.payload[:job] + "Performing #{job.class.name} from #{queue_name(event)}" + args_info(job) + end end def perform(event) - info { "Performed #{event.payload[:job].name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" } + info do + job = event.payload[:job] + "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2)}ms" + end end private @@ -72,12 +84,17 @@ module ActiveJob event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})" end - def args_info(event) - event.payload[:args].any? ? " with arguments: #{event.payload[:args].map(&:inspect).join(", ")}" : "" + def args_info(job) + if job.arguments.any? + ' with arguments: ' + + job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg.inspect }.join(', ') + else + '' + end end - def enqueued_at(event) - Time.at(event.payload[:timestamp]).utc + def scheduled_at(event) + Time.at(event.payload[:job].scheduled_at).utc end def logger diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb index 13c23abce4..d610d30e01 100644 --- a/activejob/lib/active_job/queue_adapter.rb +++ b/activejob/lib/active_job/queue_adapter.rb @@ -2,22 +2,34 @@ require 'active_job/queue_adapters/inline_adapter' require 'active_support/core_ext/string/inflections' module ActiveJob - module QueueAdapter - mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter } + # The <tt>ActiveJob::QueueAdapter</tt> module is used to load the + # correct adapter. The default queue adapter is the :inline queue. + module QueueAdapter #:nodoc: + extend ActiveSupport::Concern - def queue_adapter=(name_or_adapter) - @@queue_adapter = \ - case name_or_adapter - when Symbol, String - load_adapter(name_or_adapter) - when Class - name_or_adapter - end - end + # Includes the setter method for changing the active queue adapter. + module ClassMethods + mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter } - private - def load_adapter(name) - "ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize + # Specify the backend queue provider. The default queue adapter + # is the :inline queue. See QueueAdapters for more + # information. + def queue_adapter=(name_or_adapter) + @@queue_adapter = \ + case name_or_adapter + when :test + ActiveJob::QueueAdapters::TestAdapter.new + when Symbol, String + load_adapter(name_or_adapter) + else + name_or_adapter if name_or_adapter.respond_to?(:enqueue) + end end + + private + def load_adapter(name) + "ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize + end + end end -end
\ No newline at end of file +end diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb index 007068ff0a..4b91c93dbe 100644 --- a/activejob/lib/active_job/queue_adapters.rb +++ b/activejob/lib/active_job/queue_adapters.rb @@ -1,4 +1,39 @@ module ActiveJob + # == Active Job adapters + # + # Active Job has adapters for the following queueing backends: + # + # * {Backburner}[https://github.com/nesquena/backburner] + # * {Delayed Job}[https://github.com/collectiveidea/delayed_job] + # * {Qu}[https://github.com/bkeepers/qu] + # * {Que}[https://github.com/chanks/que] + # * {queue_classic}[https://github.com/QueueClassic/queue_classic] + # * {Resque 1.x}[https://github.com/resque/resque/tree/1-x-stable] + # * {Sidekiq}[http://sidekiq.org] + # * {Sneakers}[https://github.com/jondot/sneakers] + # * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch] + # + # === Backends Features + # + # | | Async | Queues | Delayed | Priorities | Timeout | Retries | + # |-------------------|-------|--------|-----------|------------|---------|---------| + # | Backburner | Yes | Yes | Yes | Yes | Job | Global | + # | Delayed Job | Yes | Yes | Yes | Job | Global | Global | + # | Qu | Yes | Yes | No | No | No | Global | + # | Que | Yes | Yes | Yes | Job | No | Job | + # | queue_classic | Yes | Yes | No* | No | No | No | + # | Resque | Yes | Yes | Yes (Gem) | Queue | Global | Yes | + # | Sidekiq | Yes | Yes | Yes | Queue | No | Job | + # | Sneakers | Yes | Yes | No | Queue | Queue | No | + # | Sucker Punch | Yes | Yes | No | No | No | No | + # | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A | + # | Active Job | Yes | Yes | Yes | No | No | No | + # + # NOTE: + # queue_classic does not support Job scheduling. However you can implement this + # yourself or you can use the queue_classic-later gem. See the documentation for + # ActiveJob::QueueAdapters::QueueClassicAdapter. + # module QueueAdapters extend ActiveSupport::Autoload @@ -12,5 +47,6 @@ module ActiveJob autoload :SidekiqAdapter autoload :SneakersAdapter autoload :SuckerPunchAdapter + autoload :TestAdapter end end diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb index 8d34155645..2453d065de 100644 --- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb @@ -2,22 +2,32 @@ require 'backburner' module ActiveJob module QueueAdapters + # == Backburner adapter for Active Job + # + # Backburner is a beanstalkd-powered job queue that can handle a very + # high volume of jobs. You create background jobs and place them on + # multiple work queues to be processed later. Read more about + # Backburner {here}[https://github.com/nesquena/backburner]. + # + # To use Backburner set the queue_adapter config to +:backburner+. + # + # Rails.application.config.active_job.queue_adapter = :backburner class BackburnerAdapter class << self - def enqueue(job, *args) - Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name + def enqueue(job) #:nodoc: + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name end - def enqueue_at(job, timestamp, *args) - delay = Time.current.to_f - timestamp - Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name, delay: delay + def enqueue_at(job, timestamp) #:nodoc: + delay = timestamp - Time.current.to_f + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay end end - class JobWrapper + class JobWrapper #:nodoc: class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb index a00569833a..69d9e70de3 100644 --- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -2,20 +2,36 @@ require 'delayed_job' module ActiveJob module QueueAdapters + # == Delayed Job adapter for Active Job + # + # Delayed::Job (or DJ) encapsulates the common pattern of asynchronously + # executing longer tasks in the background. Although DJ can have many + # storage backends, one of the most used is based on Active Record. + # Read more about Delayed Job {here}[https://github.com/collectiveidea/delayed_job]. + # + # To use Delayed Job, set the queue_adapter config to +:delayed_job+. + # + # Rails.application.config.active_job.queue_adapter = :delayed_job class DelayedJobAdapter class << self - def enqueue(job, *args) - JobWrapper.new.delay(queue: job.queue_name).perform(job, *args) + def enqueue(job) #:nodoc: + Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name) end - def enqueue_at(job, timestamp, *args) - JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job, *args) + def enqueue_at(job, timestamp) #:nodoc: + Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp)) end end - class JobWrapper - def perform(job, *args) - job.new.execute(*args) + class JobWrapper #:nodoc: + attr_accessor :job_data + + def initialize(job_data) + @job_data = job_data + end + + def perform + Base.execute(job_data) end end end diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb index 5805340fb0..e25d88e723 100644 --- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb @@ -1,13 +1,21 @@ module ActiveJob module QueueAdapters + # == Active Job Inline adapter + # + # When enqueueing jobs with the Inline adapter the job will be executed + # immediately. + # + # To use the Inline set the queue_adapter config to +:inline+. + # + # Rails.application.config.active_job.queue_adapter = :inline class InlineAdapter class << self - def enqueue(job, *args) - job.new.execute(*args) + def enqueue(job) #:nodoc: + Base.execute(job.serialize) end - def enqueue_at(*) - raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at https://github.com/rails/activejob") + def enqueue_at(*) #:nodoc: + raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html") end end end diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb index 5cb741c094..30aa5a4670 100644 --- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb @@ -2,27 +2,39 @@ require 'qu' module ActiveJob module QueueAdapters + # == Qu adapter for Active Job + # + # Qu is a Ruby library for queuing and processing background jobs. It is + # heavily inspired by delayed_job and Resque. Qu was created to overcome + # some shortcomings in the existing queuing libraries. + # The advantages of Qu are: Multiple backends (redis, mongo), jobs are + # requeued when worker is killed, resque-like API. + # + # Read more about Qu {here}[https://github.com/bkeepers/qu]. + # + # To use Qu set the queue_adapter config to +:qu+. + # + # Rails.application.config.active_job.queue_adapter = :qu class QuAdapter class << self - def enqueue(job, *args) - Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload| + def enqueue(job, *args) #:nodoc: + Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload| payload.instance_variable_set(:@queue, job.queue_name) end.push end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp, *args) #:nodoc: raise NotImplementedError end end - class JobWrapper < Qu::Job - def initialize(job_name, *args) - @job = job_name.constantize - @args = args + class JobWrapper < Qu::Job #:nodoc: + def initialize(job_data) + @job_data = job_data end def perform - @job.new.execute(*@args) + Base.execute @job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb index 9c84c74f83..e501fe0368 100644 --- a/activejob/lib/active_job/queue_adapters/que_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -2,20 +2,32 @@ require 'que' module ActiveJob module QueueAdapters + # == Que adapter for Active Job + # + # Que is a high-performance alternative to DelayedJob or QueueClassic that + # improves the reliability of your application by protecting your jobs with + # the same ACID guarantees as the rest of your data. Que is a queue for + # Ruby and PostgreSQL that manages jobs using advisory locks. + # + # Read more about Que {here}[https://github.com/chanks/que]. + # + # To use Que set the queue_adapter config to +:que+. + # + # Rails.application.config.active_job.queue_adapter = :que class QueAdapter class << self - def enqueue(job, *args) - JobWrapper.enqueue job.name, *args, queue: job.queue_name + def enqueue(job) #:nodoc: + JobWrapper.enqueue job.serialize, queue: job.queue_name end - def enqueue_at(job, timestamp, *args) - JobWrapper.enqueue job.name, *args, queue: job.queue_name, run_at: Time.at(timestamp) + def enqueue_at(job, timestamp) #:nodoc: + JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp) end end - class JobWrapper < Que::Job - def run(job_name, *args) - job_name.constantize.new.execute(*args) + class JobWrapper < Que::Job #:nodoc: + def run(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb index d74f8cf90e..34c11a68b2 100644 --- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -2,21 +2,50 @@ require 'queue_classic' module ActiveJob module QueueAdapters + # == queue_classic adapter for Active Job + # + # queue_classic provides a simple interface to a PostgreSQL-backed message + # queue. queue_classic specializes in concurrent locking and minimizing + # database load while providing a simple, intuitive developer experience. + # queue_classic assumes that you are already using PostgreSQL in your + # production environment and that adding another dependency (e.g. redis, + # beanstalkd, 0mq) is undesirable. + # + # Read more about queue_classic {here}[https://github.com/QueueClassic/queue_classic]. + # + # To use queue_classic set the queue_adapter config to +:queue_classic+. + # + # Rails.application.config.active_job.queue_adapter = :queue_classic class QueueClassicAdapter class << self - def enqueue(job, *args) - QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args) + def enqueue(job) #:nodoc: + build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) end - def enqueue_at(job, timestamp, *args) - raise NotImplementedError + def enqueue_at(job, timestamp) #:nodoc: + queue = build_queue(job.queue_name) + unless queue.respond_to?(:enqueue_at) + raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \ + 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \ + 'You can implement this yourself or you can use the queue_classic-later gem.' + end + queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) + end + + # Builds a <tt>QC::Queue</tt> object to schedule jobs on. + # + # If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass + # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the + # <tt>build_queue</tt> method. + def build_queue(queue_name) + QC::Queue.new(queue_name) end end - class JobWrapper + class JobWrapper #:nodoc: class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb index da8212fc9b..88c6b48fef 100644 --- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb @@ -14,25 +14,36 @@ end module ActiveJob module QueueAdapters + # == Resque adapter for Active Job + # + # Resque (pronounced like "rescue") is a Redis-backed library for creating + # background jobs, placing those jobs on multiple queues, and processing + # them later. + # + # Read more about Resque {here}[https://github.com/resque/resque]. + # + # To use Resque set the queue_adapter config to +:resque+. + # + # Rails.application.config.active_job.queue_adapter = :resque class ResqueAdapter class << self - def enqueue(job, *args) - Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args + def enqueue(job) #:nodoc: + Resque.enqueue_to job.queue_name, JobWrapper, job.serialize end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) #:nodoc: unless Resque.respond_to?(:enqueue_at_with_queue) raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \ "resque-scheduler gem. Please add it to your Gemfile and run bundle install" end - Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args + Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize end end - class JobWrapper + class JobWrapper #:nodoc: class << self - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb index 3e20bec44c..21005fc728 100644 --- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -2,32 +2,42 @@ require 'sidekiq' module ActiveJob module QueueAdapters + # == Sidekiq adapter for Active Job + # + # Simple, efficient background processing for Ruby. Sidekiq uses threads to + # handle many jobs at the same time in the same process. It does not + # require Rails but will integrate tightly with it to make background + # processing dead simple. + # + # Read more about Sidekiq {here}[http://sidekiq.org]. + # + # To use Sidekiq set the queue_adapter config to +:sidekiq+. + # + # Rails.application.config.active_job.queue_adapter = :sidekiq class SidekiqAdapter class << self - def enqueue(job, *args) + def enqueue(job) #:nodoc: #Sidekiq::Client does not support symbols as keys Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, - 'args' => [ job, *args ], - 'retry' => true + 'args' => [ job.serialize ] end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) #:nodoc: Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, - 'args' => [ job, *args ], - 'retry' => true, + 'args' => [ job.serialize ], 'at' => timestamp end end - class JobWrapper + class JobWrapper #:nodoc: include Sidekiq::Worker - def perform(job_name, *args) - job_name.constantize.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb index 48b3df6a46..6d60a2f303 100644 --- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -3,29 +3,41 @@ require 'thread' module ActiveJob module QueueAdapters + # == Sneakers adapter for Active Job + # + # A high-performance RabbitMQ background processing framework for Ruby. + # Sneakers is being used in production for both I/O and CPU intensive + # workloads, and have achieved the goals of high-performance and + # 0-maintenance, as designed. + # + # Read more about Sneakers {here}[https://github.com/jondot/sneakers]. + # + # To use Sneakers set the queue_adapter config to +:sneakers+. + # + # Rails.application.config.active_job.queue_adapter = :sneakers class SneakersAdapter @monitor = Monitor.new class << self - def enqueue(job, *args) + def enqueue(job) #:nodoc: @monitor.synchronize do JobWrapper.from_queue job.queue_name - JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ]) + JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize) end end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) #:nodoc: raise NotImplementedError end end - class JobWrapper + class JobWrapper #:nodoc: include Sneakers::Worker - from_queue 'active_jobs_default' + from_queue 'default' def work(msg) - job_name, *args = ActiveSupport::JSON.decode(msg) - job_name.constantize.new.execute(*args) + job_data = ActiveSupport::JSON.decode(msg) + Base.execute job_data ack! end end diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb index 16f05744f3..be9e7fd03a 100644 --- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -2,22 +2,37 @@ require 'sucker_punch' module ActiveJob module QueueAdapters + # == Sucker Punch adapter for Active Job + # + # Sucker Punch is a single-process Ruby asynchronous processing library. + # It's girl_friday and DSL sugar on top of Celluloid. With Celluloid's + # actor pattern, we can do asynchronous processing within a single process. + # This reduces costs of hosting on a service like Heroku along with the + # memory footprint of having to maintain additional jobs if hosting on + # a dedicated server. All queues can run within a single Rails/Sinatra + # process. + # + # Read more about Sucker Punch {here}[https://github.com/brandonhilkert/sucker_punch]. + # + # To use Sucker Punch set the queue_adapter config to +:sucker_punch+. + # + # Rails.application.config.active_job.queue_adapter = :sucker_punch class SuckerPunchAdapter class << self - def enqueue(job, *args) - JobWrapper.new.async.perform job, *args + def enqueue(job) #:nodoc: + JobWrapper.new.async.perform job.serialize end - def enqueue_at(job, timestamp, *args) + def enqueue_at(job, timestamp) #:nodoc: raise NotImplementedError end end - class JobWrapper + class JobWrapper #:nodoc: include SuckerPunch::Job - def perform(job, *args) - job.new.execute(*args) + def perform(job_data) + Base.execute job_data end end end diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb new file mode 100644 index 0000000000..ea9df9a063 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb @@ -0,0 +1,51 @@ +module ActiveJob + module QueueAdapters + # == Test adapter for Active Job + # + # The test adapter should be used only in testing. Along with + # <tt>ActiveJob::TestCase</tt> and <tt>ActiveJob::TestHelper</tt> + # it makes a great tool to test your Rails application. + # + # To use the test adapter set queue_adapter config to +:test+. + # + # Rails.application.config.active_job.queue_adapter = :test + class TestAdapter + delegate :name, to: :class + attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs) + attr_writer(:enqueued_jobs, :performed_jobs) + + def initialize + self.perform_enqueued_jobs = false + self.perform_enqueued_at_jobs = false + end + + # Provides a store of all the enqueued jobs with the TestAdapter so you can check them. + def enqueued_jobs + @enqueued_jobs ||= [] + end + + # Provides a store of all the performed jobs with the TestAdapter so you can check them. + def performed_jobs + @performed_jobs ||= [] + end + + def enqueue(job) #:nodoc: + if perform_enqueued_jobs + performed_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name} + Base.execute job.serialize + else + enqueued_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name} + end + end + + def enqueue_at(job, timestamp) #:nodoc: + if perform_enqueued_at_jobs + performed_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp} + Base.execute job.serialize + else + enqueued_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp} + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb index 9698835b6e..9ae0345120 100644 --- a/activejob/lib/active_job/queue_name.rb +++ b/activejob/lib/active_job/queue_name.rb @@ -2,20 +2,50 @@ module ActiveJob module QueueName extend ActiveSupport::Concern + # Includes the ability to override the default queue name and prefix. module ClassMethods mattr_accessor(:queue_name_prefix) mattr_accessor(:default_queue_name) { "default" } - def queue_as(part_name) - queue_name = part_name.to_s.presence || default_queue_name + # Specifies the name of the queue to process the job on. + # + # class PublishToFeedJob < ActiveJob::Base + # queue_as :feeds + # + # def perform(post) + # post.to_feed! + # end + # end + def queue_as(part_name=nil, &block) + if block_given? + self.queue_name = block + else + self.queue_name = queue_name_from_part(part_name) + end + end + + def queue_name_from_part(part_name) #:nodoc: + queue_name = part_name || default_queue_name name_parts = [queue_name_prefix.presence, queue_name] - self.queue_name = name_parts.compact.join('_') + name_parts.compact.join(queue_name_delimiter) end end included do - class_attribute :queue_name + class_attribute :queue_name, instance_accessor: false + class_attribute :queue_name_delimiter, instance_accessor: false + self.queue_name = default_queue_name + self.queue_name_delimiter = '_' # set default delimiter to '_' end + + # Returns the name of the queue the job will be run on + def queue_name + if @queue_name.is_a?(Proc) + @queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name)) + end + @queue_name + end + end end diff --git a/activejob/lib/active_job/test_case.rb b/activejob/lib/active_job/test_case.rb new file mode 100644 index 0000000000..d894a7b5cd --- /dev/null +++ b/activejob/lib/active_job/test_case.rb @@ -0,0 +1,7 @@ +require 'active_support/test_case' + +module ActiveJob + class TestCase < ActiveSupport::TestCase + include ActiveJob::TestHelper + end +end diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb new file mode 100644 index 0000000000..2efcea7f2e --- /dev/null +++ b/activejob/lib/active_job/test_helper.rb @@ -0,0 +1,220 @@ +require 'active_support/core_ext/hash/keys' + +module ActiveJob + # Provides helper methods for testing Active Job + module TestHelper + extend ActiveSupport::Concern + + included do + def before_setup + @old_queue_adapter = queue_adapter + ActiveJob::Base.queue_adapter = :test + clear_enqueued_jobs + clear_performed_jobs + super + end + + def after_teardown + super + ActiveJob::Base.queue_adapter = @old_queue_adapter + end + + # Asserts that the number of enqueued jobs matches the given number. + # + # def test_jobs + # assert_enqueued_jobs 0 + # HelloJob.perform_later('david') + # assert_enqueued_jobs 1 + # HelloJob.perform_later('abdelkader') + # assert_enqueued_jobs 2 + # end + # + # If a block is passed, that block should cause the specified number of + # jobs to be enqueued. + # + # def test_jobs_again + # assert_enqueued_jobs 1 do + # HelloJob.perform_later('cristian') + # end + # + # assert_enqueued_jobs 2 do + # HelloJob.perform_later('aaron') + # HelloJob.perform_later('rafael') + # end + # end + def assert_enqueued_jobs(number) + if block_given? + original_count = enqueued_jobs.size + yield + new_count = enqueued_jobs.size + assert_equal original_count + number, new_count, + "#{number} jobs expected, but #{new_count - original_count} were enqueued" + else + enqueued_jobs_size = enqueued_jobs.size + assert_equal number, enqueued_jobs_size, "#{number} jobs expected, but #{enqueued_jobs_size} were enqueued" + end + end + + # Asserts that no jobs have been enqueued. + # + # def test_jobs + # assert_no_enqueued_jobs + # HelloJob.perform_later('jeremy') + # assert_enqueued_jobs 1 + # end + # + # If a block is passed, that block should not cause any job to be enqueued. + # + # def test_jobs_again + # assert_no_enqueued_jobs do + # # No job should be enqueued from this block + # end + # end + # + # Note: This assertion is simply a shortcut for: + # + # assert_enqueued_jobs 0, &block + def assert_no_enqueued_jobs(&block) + assert_enqueued_jobs 0, &block + end + + # Asserts that the number of performed jobs matches the given number. + # If no block is passed, <tt>perform_enqueued_jobs</tt> + # must be called around the job call. + # + # def test_jobs + # assert_performed_jobs 0 + # + # perform_enqueued_jobs do + # HelloJob.perform_later('xavier') + # end + # assert_performed_jobs 1 + # + # perform_enqueued_jobs do + # HelloJob.perform_later('yves') + # assert_performed_jobs 2 + # end + # end + # + # If a block is passed, that block should cause the specified number of + # jobs to be performed. + # + # def test_jobs_again + # assert_performed_jobs 1 do + # HelloJob.perform_later('robin') + # end + # + # assert_performed_jobs 2 do + # HelloJob.perform_later('carlos') + # HelloJob.perform_later('sean') + # end + # end + def assert_performed_jobs(number) + if block_given? + original_count = performed_jobs.size + perform_enqueued_jobs { yield } + new_count = performed_jobs.size + assert_equal original_count + number, new_count, + "#{number} jobs expected, but #{new_count - original_count} were performed" + else + performed_jobs_size = performed_jobs.size + assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed" + end + end + + # Asserts that no jobs have been performed. + # + # def test_jobs + # assert_no_performed_jobs + # + # perform_enqueued_jobs do + # HelloJob.perform_later('matthew') + # assert_performed_jobs 1 + # end + # end + # + # If a block is passed, that block should not cause any job to be performed. + # + # def test_jobs_again + # assert_no_performed_jobs do + # # No job should be performed from this block + # end + # end + # + # Note: This assertion is simply a shortcut for: + # + # assert_performed_jobs 0, &block + def assert_no_performed_jobs(&block) + assert_performed_jobs 0, &block + end + + # Asserts that the job passed in the block has been enqueued with the given arguments. + # + # def test_assert_enqueued_with + # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do + # MyJob.perform_later(1,2,3) + # end + # end + def assert_enqueued_with(args = {}, &_block) + original_enqueued_jobs = enqueued_jobs.dup + clear_enqueued_jobs + args.assert_valid_keys(:job, :args, :at, :queue) + yield + matching_job = enqueued_jobs.any? do |job| + args.all? { |key, value| value == job[key] } + end + assert matching_job, "No enqueued job found with #{args}" + ensure + queue_adapter.enqueued_jobs = original_enqueued_jobs + enqueued_jobs + end + + # Asserts that the job passed in the block has been performed with the given arguments. + # + # def test_assert_performed_with + # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do + # MyJob.perform_later(1,2,3) + # end + # end + def assert_performed_with(args = {}, &_block) + original_performed_jobs = performed_jobs.dup + clear_performed_jobs + args.assert_valid_keys(:job, :args, :at, :queue) + perform_enqueued_jobs { yield } + matching_job = performed_jobs.any? do |job| + args.all? { |key, value| value == job[key] } + end + assert matching_job, "No performed job found with #{args}" + ensure + queue_adapter.performed_jobs = original_performed_jobs + performed_jobs + end + + def perform_enqueued_jobs + @old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs + @old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs + queue_adapter.perform_enqueued_jobs = true + queue_adapter.perform_enqueued_at_jobs = true + yield + ensure + queue_adapter.perform_enqueued_jobs = @old_perform_enqueued_jobs + queue_adapter.perform_enqueued_at_jobs = @old_perform_enqueued_at_jobs + end + + def queue_adapter + ActiveJob::Base.queue_adapter + end + + delegate :enqueued_jobs, :enqueued_jobs=, + :performed_jobs, :performed_jobs=, + to: :queue_adapter + + private + def clear_enqueued_jobs + enqueued_jobs.clear + end + + def clear_performed_jobs + performed_jobs.clear + end + end + end +end diff --git a/activejob/test/adapters/test.rb b/activejob/test/adapters/test.rb new file mode 100644 index 0000000000..7180b38a57 --- /dev/null +++ b/activejob/test/adapters/test.rb @@ -0,0 +1,3 @@ +ActiveJob::Base.queue_adapter = :test +ActiveJob::Base.queue_adapter.perform_enqueued_jobs = true +ActiveJob::Base.queue_adapter.perform_enqueued_at_jobs = true diff --git a/activejob/test/cases/adapter_test.rb b/activejob/test/cases/adapter_test.rb index 4fc235ae40..6570c55a83 100644 --- a/activejob/test/cases/adapter_test.rb +++ b/activejob/test/cases/adapter_test.rb @@ -2,7 +2,6 @@ require 'helper' class AdapterTest < ActiveSupport::TestCase test "should load #{ENV['AJADAPTER']} adapter" do - ActiveJob::Base.queue_adapter = ENV['AJADAPTER'].to_sym - assert_equal ActiveJob::Base.queue_adapter, "active_job/queue_adapters/#{ENV['AJADAPTER']}_adapter".classify.constantize + assert_equal "active_job/queue_adapters/#{ENV['AJADAPTER']}_adapter".classify, ActiveJob::Base.queue_adapter.name end end diff --git a/activejob/test/cases/argument_serialization_test.rb b/activejob/test/cases/argument_serialization_test.rb new file mode 100644 index 0000000000..dbe36fc572 --- /dev/null +++ b/activejob/test/cases/argument_serialization_test.rb @@ -0,0 +1,84 @@ +require 'helper' +require 'active_job/arguments' +require 'models/person' +require 'active_support/core_ext/hash/indifferent_access' + +class ArgumentSerializationTest < ActiveSupport::TestCase + setup do + @person = Person.find('5') + end + + [ nil, 1, 1.0, 1_000_000_000_000_000_000_000, + 'a', true, false, + [ 1, 'a' ], + { 'a' => 1 } + ].each do |arg| + test "serializes #{arg.class} verbatim" do + assert_arguments_unchanged arg + end + end + + [ :a, Object.new, self, Person.find('5').to_gid ].each do |arg| + test "does not serialize #{arg.class}" do + assert_raises ActiveJob::SerializationError do + ActiveJob::Arguments.serialize [ arg ] + end + + assert_raises ActiveJob::DeserializationError do + ActiveJob::Arguments.deserialize [ arg ] + end + end + end + + test 'should convert records to Global IDs' do + assert_arguments_roundtrip [@person], ['_aj_globalid' => @person.to_gid.to_s] + end + + test 'should dive deep into arrays and hashes' do + assert_arguments_roundtrip [3, [@person]], [3, ['_aj_globalid' => @person.to_gid.to_s]] + assert_arguments_roundtrip [{ 'a' => @person }], [{ 'a' => { '_aj_globalid' => @person.to_gid.to_s }}.with_indifferent_access] + end + + test 'should stringify symbol hash keys' do + assert_equal [ 'a' => 1 ], ActiveJob::Arguments.serialize([ a: 1 ]) + end + + test 'should disallow non-string/symbol hash keys' do + assert_raises ActiveJob::SerializationError do + ActiveJob::Arguments.serialize [ { 1 => 2 } ] + end + + assert_raises ActiveJob::SerializationError do + ActiveJob::Arguments.serialize [ { :a => [{ 2 => 3 }] } ] + end + + assert_raises ActiveJob::SerializationError do + ActiveJob::Arguments.serialize [ '_aj_globalid' => 1 ] + end + + assert_raises ActiveJob::SerializationError do + ActiveJob::Arguments.serialize [ :_aj_globalid => 1 ] + end + end + + test 'should not allow non-primitive objects' do + assert_raises ActiveJob::SerializationError do + ActiveJob::Arguments.serialize [Object.new] + end + + assert_raises ActiveJob::SerializationError do + ActiveJob::Arguments.serialize [1, [Object.new]] + end + end + + private + def assert_arguments_unchanged(*args) + assert_arguments_roundtrip args, args + end + + def assert_arguments_roundtrip(args, expected_serialized_args) + serialized = ActiveJob::Arguments.serialize(args) + assert_equal expected_serialized_args, serialized + assert_equal args, ActiveJob::Arguments.deserialize(serialized) + end +end diff --git a/activejob/test/cases/callbacks_test.rb b/activejob/test/cases/callbacks_test.rb index 9a0657ee89..9af2380767 100644 --- a/activejob/test/cases/callbacks_test.rb +++ b/activejob/test/cases/callbacks_test.rb @@ -5,7 +5,8 @@ require 'active_support/core_ext/object/inclusion' class CallbacksTest < ActiveSupport::TestCase test 'perform callbacks' do - performed_callback_job = CallbackJob.new.tap { |j| j.execute("A-JOB-ID") } + performed_callback_job = CallbackJob.new("A-JOB-ID") + performed_callback_job.perform_now assert "CallbackJob ran before_perform".in? performed_callback_job.history assert "CallbackJob ran after_perform".in? performed_callback_job.history assert "CallbackJob ran around_perform_start".in? performed_callback_job.history @@ -13,7 +14,7 @@ class CallbacksTest < ActiveSupport::TestCase end test 'enqueue callbacks' do - enqueued_callback_job = CallbackJob.enqueue + enqueued_callback_job = CallbackJob.perform_later assert "CallbackJob ran before_enqueue".in? enqueued_callback_job.history assert "CallbackJob ran after_enqueue".in? enqueued_callback_job.history assert "CallbackJob ran around_enqueue_start".in? enqueued_callback_job.history diff --git a/activejob/test/cases/job_serialization_test.rb b/activejob/test/cases/job_serialization_test.rb index fc1b77744c..db22783030 100644 --- a/activejob/test/cases/job_serialization_test.rb +++ b/activejob/test/cases/job_serialization_test.rb @@ -9,7 +9,7 @@ class JobSerializationTest < ActiveSupport::TestCase end test 'serialize job with gid' do - GidJob.enqueue @person + GidJob.perform_later @person assert_equal "Person with ID: 5", JobBuffer.last_value end end diff --git a/activejob/test/cases/logging_test.rb b/activejob/test/cases/logging_test.rb index 888c183a0b..64aae00441 100644 --- a/activejob/test/cases/logging_test.rb +++ b/activejob/test/cases/logging_test.rb @@ -4,6 +4,7 @@ require 'active_support/core_ext/numeric/time' require 'jobs/hello_job' require 'jobs/logging_job' require 'jobs/nested_job' +require 'models/person' class AdapterTest < ActiveSupport::TestCase include ActiveSupport::LogSubscriber::TestHelper @@ -33,7 +34,7 @@ class AdapterTest < ActiveSupport::TestCase def teardown super ActiveJob::Logging::LogSubscriber.log_subscribers.pop - ActiveJob::Base.logger = @old_logger + set_logger @old_logger end def set_logger(logger) @@ -42,34 +43,51 @@ class AdapterTest < ActiveSupport::TestCase def test_uses_active_job_as_tag - HelloJob.enqueue "Cristian" + HelloJob.perform_later "Cristian" assert_match(/\[ActiveJob\]/, @logger.messages) end + def test_uses_job_name_as_tag + LoggingJob.perform_later "Dummy" + assert_match(/\[LoggingJob\]/, @logger.messages) + end + + def test_uses_job_id_as_tag + LoggingJob.perform_later "Dummy" + assert_match(/\[LOGGING-JOB-ID\]/, @logger.messages) + end + + def test_logs_correct_queue_name + original_queue_name = LoggingJob.queue_name + LoggingJob.queue_as :php_jobs + LoggingJob.perform_later("Dummy") + assert_match(/to .*?\(php_jobs\).*/, @logger.messages) + ensure + LoggingJob.queue_name = original_queue_name + end + + def test_globalid_parameter_logging + person = Person.new(123) + LoggingJob.perform_later person + assert_match(%r{Enqueued.*gid://aj/Person/123}, @logger.messages) + assert_match(%r{Dummy, here is it: #<Person:.*>}, @logger.messages) + assert_match(%r{Performing.*gid://aj/Person/123}, @logger.messages) + end + def test_enqueue_job_logging - HelloJob.enqueue "Cristian" + HelloJob.perform_later "Cristian" assert_match(/Enqueued HelloJob \(Job ID: .*?\) to .*?:.*Cristian/, @logger.messages) end def test_perform_job_logging - LoggingJob.enqueue "Dummy" + LoggingJob.perform_later "Dummy" assert_match(/Performing LoggingJob from .*? with arguments:.*Dummy/, @logger.messages) assert_match(/Dummy, here is it: Dummy/, @logger.messages) assert_match(/Performed LoggingJob from .*? in .*ms/, @logger.messages) end - def test_perform_uses_job_name_job_logging - LoggingJob.enqueue "Dummy" - assert_match(/\[LoggingJob\]/, @logger.messages) - end - - def test_perform_uses_job_id_job_logging - LoggingJob.enqueue "Dummy" - assert_match(/\[LOGGING-JOB-ID\]/, @logger.messages) - end - def test_perform_nested_jobs_logging - NestedJob.enqueue + NestedJob.perform_later assert_match(/\[LoggingJob\] \[.*?\]/, @logger.messages) assert_match(/\[ActiveJob\] Enqueued NestedJob \(Job ID: .*\) to/, @logger.messages) assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Performing NestedJob from/, @logger.messages) @@ -81,14 +99,14 @@ class AdapterTest < ActiveSupport::TestCase end def test_enqueue_at_job_logging - HelloJob.enqueue_at 1, "Cristian" + HelloJob.set(wait_until: 24.hours.from_now).perform_later "Cristian" assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages) rescue NotImplementedError skip end def test_enqueue_in_job_logging - HelloJob.enqueue_in 2, "Cristian" + HelloJob.set(wait: 2.seconds).perform_later "Cristian" assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages) rescue NotImplementedError skip diff --git a/activejob/test/cases/parameters_test.rb b/activejob/test/cases/parameters_test.rb deleted file mode 100644 index 78853c51e1..0000000000 --- a/activejob/test/cases/parameters_test.rb +++ /dev/null @@ -1,77 +0,0 @@ -require 'helper' -require 'active_job/arguments' -require 'models/person' -require 'active_support/core_ext/hash/indifferent_access' - -class ParameterSerializationTest < ActiveSupport::TestCase - test 'should make no change to regular values' do - assert_equal [ 1, "something" ], ActiveJob::Arguments.serialize([ 1, "something" ]) - end - - test 'should not allow complex objects' do - assert_equal [ nil ], ActiveJob::Arguments.serialize([ nil ]) - assert_equal [ 1 ], ActiveJob::Arguments.serialize([ 1 ]) - assert_equal [ 1.0 ], ActiveJob::Arguments.serialize([ 1.0 ]) - assert_equal [ 'a' ], ActiveJob::Arguments.serialize([ 'a' ]) - assert_equal [ true ], ActiveJob::Arguments.serialize([ true ]) - assert_equal [ false ], ActiveJob::Arguments.serialize([ false ]) - assert_equal [ { "a" => 1, "b" => 2 } ], ActiveJob::Arguments.serialize([ { a: 1, "b" => 2 } ]) - assert_equal [ [ 1 ] ], ActiveJob::Arguments.serialize([ [ 1 ] ]) - assert_equal [ 1_000_000_000_000_000_000_000 ], ActiveJob::Arguments.serialize([ 1_000_000_000_000_000_000_000 ]) - - err = assert_raises ActiveJob::SerializationError do - ActiveJob::Arguments.serialize([ 1, self ]) - end - assert_equal "Unsupported argument type: #{self.class.name}", err.message - end - - test 'should dive deep into arrays or hashes' do - assert_equal [ { "a" => Person.find(5).gid.to_s }.with_indifferent_access ], ActiveJob::Arguments.serialize([ { a: Person.find(5) } ]) - assert_equal [ [ Person.find(5).gid.to_s ] ], ActiveJob::Arguments.serialize([ [ Person.find(5) ] ]) - end - - test 'should dive deep into arrays or hashes and raise exception on complex objects' do - err = assert_raises ActiveJob::SerializationError do - ActiveJob::Arguments.serialize([ 1, [self] ]) - end - assert_equal "Unsupported argument type: #{self.class.name}", err.message - end - - test 'shoud dive deep into hashes and allow raise exception on not string/symbol keys' do - err = assert_raises ActiveJob::SerializationError do - ActiveJob::Arguments.serialize([ [ { 1 => 2 } ] ]) - end - assert_equal "Unsupported hash key type: Fixnum", err.message - end - - test 'should serialize records with global id' do - assert_equal [ Person.find(5).gid.to_s ], ActiveJob::Arguments.serialize([ Person.find(5) ]) - end - - test 'should serialize values and records together' do - assert_equal [ 3, Person.find(5).gid.to_s ], ActiveJob::Arguments.serialize([ 3, Person.find(5) ]) - end -end - -class ParameterDeserializationTest < ActiveSupport::TestCase - test 'should make no change to regular values' do - assert_equal [ 1, "something" ], ActiveJob::Arguments.deserialize([ 1, "something" ]) - end - - test 'should deserialize records with global id' do - assert_equal [ Person.find(5) ], ActiveJob::Arguments.deserialize([ Person.find(5).gid ]) - end - - test 'should serialize values and records together' do - assert_equal [ 3, Person.find(5) ], ActiveJob::Arguments.deserialize([ 3, Person.find(5).gid ]) - end - - test 'should dive deep when deserialising arrays' do - assert_equal [ [ 3, Person.find(5) ] ], ActiveJob::Arguments.deserialize([ [ 3, Person.find(5).gid ] ]) - end - - test 'should dive deep when deserialising hashes' do - assert_equal [ { "5" => Person.find(5) } ], ActiveJob::Arguments.deserialize([ { "5" => Person.find(5).gid } ]) - end - -end diff --git a/activejob/test/cases/queue_naming_test.rb b/activejob/test/cases/queue_naming_test.rb index fdfd1afceb..898016a704 100644 --- a/activejob/test/cases/queue_naming_test.rb +++ b/activejob/test/cases/queue_naming_test.rb @@ -8,24 +8,67 @@ class QueueNamingTest < ActiveSupport::TestCase assert_equal "default", HelloJob.queue_name end - test 'name appended in job' do + test 'uses given queue name job' do + original_queue_name = HelloJob.queue_name + begin HelloJob.queue_as :greetings - LoggingJob.queue_as :bookkeeping + assert_equal "greetings", HelloJob.new.queue_name + ensure + HelloJob.queue_name = original_queue_name + end + end + + test 'allows a blank queue name' do + original_queue_name = HelloJob.queue_name + + begin + HelloJob.queue_as "" + assert_equal "", HelloJob.new.queue_name + ensure + HelloJob.queue_name = original_queue_name + end + end + + test 'does not use a nil queue name' do + original_queue_name = HelloJob.queue_name + + begin + HelloJob.queue_as nil + assert_equal "default", HelloJob.new.queue_name + ensure + HelloJob.queue_name = original_queue_name + end + end + + test 'evals block given to queue_as to determine queue' do + original_queue_name = HelloJob.queue_name - assert_equal "default", NestedJob.queue_name - assert_equal "greetings", HelloJob.queue_name - assert_equal "bookkeeping", LoggingJob.queue_name + begin + HelloJob.queue_as { :another } + assert_equal "another", HelloJob.new.queue_name ensure - HelloJob.queue_name = LoggingJob.queue_name = ActiveJob::Base.default_queue_name + HelloJob.queue_name = original_queue_name end end - test 'should prefix the queue name' do + test 'can use arguments to determine queue_name in queue_as block' do + original_queue_name = HelloJob.queue_name + begin - original_queue_name_prefix = ActiveJob::Base.queue_name_prefix - original_queue_name = HelloJob.queue_name + HelloJob.queue_as { self.arguments.first=='1' ? :one : :two } + assert_equal "one", HelloJob.new('1').queue_name + assert_equal "two", HelloJob.new('3').queue_name + ensure + HelloJob.queue_name = original_queue_name + end + end + + test 'queue_name_prefix prepended to the queue name with default delimiter' do + original_queue_name_prefix = ActiveJob::Base.queue_name_prefix + original_queue_name = HelloJob.queue_name + begin ActiveJob::Base.queue_name_prefix = 'aj' HelloJob.queue_as :low assert_equal 'aj_low', HelloJob.queue_name @@ -35,4 +78,25 @@ class QueueNamingTest < ActiveSupport::TestCase end end + test 'queue_name_prefix prepended to the queue name with custom delimiter' do + original_queue_name_prefix = ActiveJob::Base.queue_name_prefix + original_queue_name_delimiter = ActiveJob::Base.queue_name_delimiter + original_queue_name = HelloJob.queue_name + + begin + ActiveJob::Base.queue_name_delimiter = '.' + ActiveJob::Base.queue_name_prefix = 'aj' + HelloJob.queue_as :low + assert_equal 'aj.low', HelloJob.queue_name + ensure + ActiveJob::Base.queue_name_prefix = original_queue_name_prefix + ActiveJob::Base.queue_name_delimiter = original_queue_name_delimiter + HelloJob.queue_name = original_queue_name + end + end + + test 'uses queue passed to #set' do + job = HelloJob.set(queue: :some_queue).perform_later + assert_equal "some_queue", job.queue_name + end end diff --git a/activejob/test/cases/queuing_test.rb b/activejob/test/cases/queuing_test.rb index f020316d7e..0eeabbf693 100644 --- a/activejob/test/cases/queuing_test.rb +++ b/activejob/test/cases/queuing_test.rb @@ -9,18 +9,18 @@ class QueuingTest < ActiveSupport::TestCase end test 'run queued job' do - HelloJob.enqueue + HelloJob.perform_later assert_equal "David says hello", JobBuffer.last_value end test 'run queued job with arguments' do - HelloJob.enqueue "Jamie" + HelloJob.perform_later "Jamie" assert_equal "Jamie says hello", JobBuffer.last_value end test 'run queued job later' do begin - result = HelloJob.enqueue_at 1.second.ago, "Jamie" + result = HelloJob.set(wait_until: 1.second.ago).perform_later "Jamie" assert result rescue NotImplementedError skip @@ -28,15 +28,15 @@ class QueuingTest < ActiveSupport::TestCase end test 'job returned by enqueue has the arguments available' do - job = HelloJob.enqueue "Jamie" + job = HelloJob.perform_later "Jamie" assert_equal [ "Jamie" ], job.arguments end - test 'job returned by enqueue_at has the timestamp available' do + test 'job returned by perform_at has the timestamp available' do begin - job = HelloJob.enqueue_at Time.utc(2014, 1, 1) - assert_equal Time.utc(2014, 1, 1), job.enqueued_at + job = HelloJob.set(wait_until: Time.utc(2014, 1, 1)).perform_later + assert_equal Time.utc(2014, 1, 1).to_f, job.scheduled_at rescue NotImplementedError skip end diff --git a/activejob/test/cases/rescue_test.rb b/activejob/test/cases/rescue_test.rb index d9ea3c91d7..58c9ca8992 100644 --- a/activejob/test/cases/rescue_test.rb +++ b/activejob/test/cases/rescue_test.rb @@ -2,30 +2,33 @@ require 'helper' require 'jobs/rescue_job' require 'models/person' -require 'active_support/core_ext/object/inclusion' - class RescueTest < ActiveSupport::TestCase setup do JobBuffer.clear end test 'rescue perform exception with retry' do - job = RescueJob.new - job.execute(SecureRandom.uuid, "david") + job = RescueJob.new("david") + job.perform_now assert_equal [ "rescued from ArgumentError", "performed beautifully" ], JobBuffer.values end test 'let through unhandled perform exception' do - job = RescueJob.new + job = RescueJob.new("other") assert_raises(RescueJob::OtherError) do - job.execute(SecureRandom.uuid, "other") + job.perform_now end end test 'rescue from deserialization errors' do - RescueJob.enqueue Person.new(404) + RescueJob.perform_later Person.new(404) assert_includes JobBuffer.values, 'rescued from DeserializationError' assert_includes JobBuffer.values, 'DeserializationError original exception was Person::RecordNotFound' assert_not_includes JobBuffer.values, 'performed beautifully' end + + test "should not wrap DeserializationError in DeserializationError" do + RescueJob.perform_later [Person.new(404)] + assert_includes JobBuffer.values, 'DeserializationError original exception was Person::RecordNotFound' + end end diff --git a/activejob/test/cases/test_case_test.rb b/activejob/test/cases/test_case_test.rb new file mode 100644 index 0000000000..1d0fdbd22d --- /dev/null +++ b/activejob/test/cases/test_case_test.rb @@ -0,0 +1,14 @@ +require 'helper' +require 'jobs/hello_job' +require 'jobs/logging_job' +require 'jobs/nested_job' + +class ActiveJobTestCaseTest < ActiveJob::TestCase + def test_include_helper + assert_includes self.class.ancestors, ActiveJob::TestHelper + end + + def test_set_test_adapter + assert_instance_of ActiveJob::QueueAdapters::TestAdapter, self.queue_adapter + end +end diff --git a/activejob/test/cases/test_helper_test.rb b/activejob/test/cases/test_helper_test.rb new file mode 100644 index 0000000000..784ede3674 --- /dev/null +++ b/activejob/test/cases/test_helper_test.rb @@ -0,0 +1,226 @@ +require 'helper' +require 'active_support/core_ext/time' +require 'active_support/core_ext/date' +require 'jobs/hello_job' +require 'jobs/logging_job' +require 'jobs/nested_job' + +class EnqueuedJobsTest < ActiveJob::TestCase + def test_assert_enqueued_jobs + assert_nothing_raised do + assert_enqueued_jobs 1 do + HelloJob.perform_later('david') + end + end + end + + def test_repeated_enqueued_jobs_calls + assert_nothing_raised do + assert_enqueued_jobs 1 do + HelloJob.perform_later('abdelkader') + end + end + + assert_nothing_raised do + assert_enqueued_jobs 2 do + HelloJob.perform_later('sean') + HelloJob.perform_later('yves') + end + end + end + + def test_assert_enqueued_jobs_with_no_block + assert_nothing_raised do + HelloJob.perform_later('rafael') + assert_enqueued_jobs 1 + end + + assert_nothing_raised do + HelloJob.perform_later('aaron') + HelloJob.perform_later('matthew') + assert_enqueued_jobs 3 + end + end + + def test_assert_no_enqueued_jobs_with_no_block + assert_nothing_raised do + assert_no_enqueued_jobs + end + end + + def test_assert_no_enqueued_jobs + assert_nothing_raised do + assert_no_enqueued_jobs do + HelloJob.perform_now + end + end + end + + def test_assert_enqueued_jobs_too_few_sent + error = assert_raise ActiveSupport::TestCase::Assertion do + assert_enqueued_jobs 2 do + HelloJob.perform_later('xavier') + end + end + + assert_match(/2 .* but 1/, error.message) + end + + def test_assert_enqueued_jobs_too_many_sent + error = assert_raise ActiveSupport::TestCase::Assertion do + assert_enqueued_jobs 1 do + HelloJob.perform_later('cristian') + HelloJob.perform_later('guillermo') + end + end + + assert_match(/1 .* but 2/, error.message) + end + + def test_assert_no_enqueued_jobs_failure + error = assert_raise ActiveSupport::TestCase::Assertion do + assert_no_enqueued_jobs do + HelloJob.perform_later('jeremy') + end + end + + assert_match(/0 .* but 1/, error.message) + end + + def test_assert_enqueued_job + assert_enqueued_with(job: LoggingJob, queue: 'default') do + LoggingJob.set(wait_until: Date.tomorrow.noon).perform_later + end + end + + def test_assert_enqueued_job_failure + assert_raise ActiveSupport::TestCase::Assertion do + assert_enqueued_with(job: LoggingJob, queue: 'default') do + NestedJob.perform_later + end + end + + error = assert_raise ActiveSupport::TestCase::Assertion do + assert_enqueued_with(job: NestedJob, queue: 'low') do + NestedJob.perform_later + end + end + + assert_equal 'No enqueued job found with {:job=>NestedJob, :queue=>"low"}', error.message + end + + def test_assert_enqueued_job_args + assert_raise ArgumentError do + assert_enqueued_with(class: LoggingJob) do + NestedJob.set(wait_until: Date.tomorrow.noon).perform_later + end + end + end +end + +class PerformedJobsTest < ActiveJob::TestCase + def test_assert_performed_jobs + assert_nothing_raised do + assert_performed_jobs 1 do + HelloJob.perform_later('david') + end + end + end + + def test_repeated_performed_jobs_calls + assert_nothing_raised do + assert_performed_jobs 1 do + HelloJob.perform_later('abdelkader') + end + end + + assert_nothing_raised do + assert_performed_jobs 2 do + HelloJob.perform_later('sean') + HelloJob.perform_later('yves') + end + end + end + + def test_assert_performed_jobs_with_no_block + assert_nothing_raised do + perform_enqueued_jobs do + HelloJob.perform_later('rafael') + end + assert_performed_jobs 1 + end + + assert_nothing_raised do + perform_enqueued_jobs do + HelloJob.perform_later('aaron') + HelloJob.perform_later('matthew') + assert_performed_jobs 3 + end + end + end + + def test_assert_no_performed_jobs_with_no_block + assert_nothing_raised do + assert_no_performed_jobs + end + end + + def test_assert_no_performed_jobs + assert_nothing_raised do + assert_no_performed_jobs do + # empty block won't perform jobs + end + end + end + + def test_assert_performed_jobs_too_few_sent + error = assert_raise ActiveSupport::TestCase::Assertion do + assert_performed_jobs 2 do + HelloJob.perform_later('xavier') + end + end + + assert_match(/2 .* but 1/, error.message) + end + + def test_assert_performed_jobs_too_many_sent + error = assert_raise ActiveSupport::TestCase::Assertion do + assert_performed_jobs 1 do + HelloJob.perform_later('cristian') + HelloJob.perform_later('guillermo') + end + end + + assert_match(/1 .* but 2/, error.message) + end + + def test_assert_no_performed_jobs_failure + error = assert_raise ActiveSupport::TestCase::Assertion do + assert_no_performed_jobs do + HelloJob.perform_later('jeremy') + end + end + + assert_match(/0 .* but 1/, error.message) + end + + def test_assert_performed_job + assert_performed_with(job: NestedJob, queue: 'default') do + NestedJob.perform_later + end + end + + def test_assert_performed_job_failure + assert_raise ActiveSupport::TestCase::Assertion do + assert_performed_with(job: LoggingJob, at: Date.tomorrow.noon, queue: 'default') do + NestedJob.set(wait_until: Date.tomorrow.noon).perform_later + end + end + + assert_raise ActiveSupport::TestCase::Assertion do + assert_performed_with(job: NestedJob, at: Date.tomorrow.noon, queue: 'low') do + NestedJob.set(queue: 'low', wait_until: Date.tomorrow.noon).perform_later + end + end + end +end diff --git a/activejob/test/helper.rb b/activejob/test/helper.rb index a56b35da12..ec0c8a8ede 100644 --- a/activejob/test/helper.rb +++ b/activejob/test/helper.rb @@ -1,6 +1,7 @@ require File.expand_path('../../../load_paths', __FILE__) require 'active_job' +require 'support/job_buffer' GlobalID.app = 'aj' @@ -10,21 +11,19 @@ def sidekiq? @adapter == 'sidekiq' end -def rubinius? - RUBY_ENGINE == 'rbx' -end - def ruby_193? RUBY_VERSION == '1.9.3' && RUBY_ENGINE != 'java' end -#Sidekiq don't work with MRI 1.9.3 -#Travis uses rbx 2.6 which don't support unicode characters in methods. -#Remove the check when Travis change to rbx 2.7+ -exit if sidekiq? && (ruby_193? || rubinius?) +# Sidekiq doesn't work with MRI 1.9.3 +exit if sidekiq? && ruby_193? -require "adapters/#{@adapter}" +if ENV['AJ_INTEGRATION_TESTS'] + require 'support/integration/helper' +else + require "adapters/#{@adapter}" +end require 'active_support/testing/autorun' -ActiveJob::Base.logger.level = Logger::DEBUG +ActiveSupport::TestCase.test_order = :random diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb new file mode 100644 index 0000000000..38874b51a8 --- /dev/null +++ b/activejob/test/integration/queuing_test.rb @@ -0,0 +1,47 @@ +require 'helper' +require 'jobs/logging_job' +require 'active_support/core_ext/numeric/time' + +class QueuingTest < ActiveSupport::TestCase + test 'should run jobs enqueued on a listening queue' do + TestJob.perform_later @id + wait_for_jobs_to_finish_for(5.seconds) + assert job_executed + end + + test 'should not run jobs queued on a non-listening queue' do + skip if adapter_is?(:inline) || adapter_is?(:sucker_punch) + old_queue = TestJob.queue_name + + begin + TestJob.queue_as :some_other_queue + TestJob.perform_later @id + wait_for_jobs_to_finish_for(2.seconds) + assert_not job_executed + ensure + TestJob.queue_name = old_queue + end + end + + test 'should not run job enqueued in the future' do + begin + TestJob.set(wait: 10.minutes).perform_later @id + wait_for_jobs_to_finish_for(5.seconds) + assert_not job_executed + rescue NotImplementedError + skip + end + end + + test 'should run job enqueued in the future at the specified time' do + begin + TestJob.set(wait: 3.seconds).perform_later @id + wait_for_jobs_to_finish_for(2.seconds) + assert_not job_executed + wait_for_jobs_to_finish_for(10.seconds) + assert job_executed + rescue NotImplementedError + skip + end + end +end diff --git a/activejob/test/jobs/callback_job.rb b/activejob/test/jobs/callback_job.rb index 056dd073e8..891ed9464e 100644 --- a/activejob/test/jobs/callback_job.rb +++ b/activejob/test/jobs/callback_job.rb @@ -1,12 +1,21 @@ class CallbackJob < ActiveJob::Base before_perform ->(job) { job.history << "CallbackJob ran before_perform" } - after_perform ->(job) { job.history << "CallbackJob ran after_perform" } + after_perform ->(job) { job.history << "CallbackJob ran after_perform" } before_enqueue ->(job) { job.history << "CallbackJob ran before_enqueue" } - after_enqueue ->(job) { job.history << "CallbackJob ran after_enqueue" } + after_enqueue ->(job) { job.history << "CallbackJob ran after_enqueue" } - around_perform :around_perform - around_enqueue :around_enqueue + around_perform do |job, block| + job.history << "CallbackJob ran around_perform_start" + block.call + job.history << "CallbackJob ran around_perform_stop" + end + + around_enqueue do |job, block| + job.history << "CallbackJob ran around_enqueue_start" + block.call + job.history << "CallbackJob ran around_enqueue_stop" + end def perform(person = "david") @@ -17,16 +26,4 @@ class CallbackJob < ActiveJob::Base @history ||= [] end - # FIXME: Not sure why these can't be declared inline like before/after - def around_perform - history << "CallbackJob ran around_perform_start" - yield - history << "CallbackJob ran around_perform_stop" - end - - def around_enqueue - history << "CallbackJob ran around_enqueue_start" - yield - history << "CallbackJob ran around_enqueue_stop" - end end diff --git a/activejob/test/jobs/nested_job.rb b/activejob/test/jobs/nested_job.rb index fd66f68991..8c4ec549a6 100644 --- a/activejob/test/jobs/nested_job.rb +++ b/activejob/test/jobs/nested_job.rb @@ -1,6 +1,6 @@ class NestedJob < ActiveJob::Base def perform - LoggingJob.enqueue "NestedJob" + LoggingJob.perform_later "NestedJob" end def job_id diff --git a/activejob/test/jobs/rescue_job.rb b/activejob/test/jobs/rescue_job.rb index 6b6e74e9d0..f1b9c9349e 100644 --- a/activejob/test/jobs/rescue_job.rb +++ b/activejob/test/jobs/rescue_job.rb @@ -6,7 +6,7 @@ class RescueJob < ActiveJob::Base rescue_from(ArgumentError) do JobBuffer.add('rescued from ArgumentError') arguments[0] = "DIFFERENT!" - retry_now + retry_job end rescue_from(ActiveJob::DeserializationError) do |e| diff --git a/activejob/test/support/delayed_job/delayed/backend/test.rb b/activejob/test/support/delayed_job/delayed/backend/test.rb index b50ed36fc2..f80ec3a5a6 100644 --- a/activejob/test/support/delayed_job/delayed/backend/test.rb +++ b/activejob/test/support/delayed_job/delayed/backend/test.rb @@ -43,9 +43,7 @@ module Delayed end def self.create(attrs = {}) - new(attrs).tap do |o| - o.save - end + new(attrs).tap(&:save) end def self.create!(*args); create(*args); end diff --git a/activejob/test/support/integration/adapters/backburner.rb b/activejob/test/support/integration/adapters/backburner.rb new file mode 100644 index 0000000000..2e82562948 --- /dev/null +++ b/activejob/test/support/integration/adapters/backburner.rb @@ -0,0 +1,38 @@ +module BackburnerJobsManager + def setup + ActiveJob::Base.queue_adapter = :backburner + Backburner.configure do |config| + config.logger = Rails.logger + end + unless can_run? + puts "Cannot run integration tests for backburner. To be able to run integration tests for backburner you need to install and start beanstalkd.\n" + exit + end + end + + def clear_jobs + tube.clear + end + + def start_workers + @thread = Thread.new { Backburner.work "integration-tests" } # backburner dasherizes the queue name + end + + def stop_workers + @thread.kill + end + + def tube + @tube ||= Beaneater::Tube.new(Backburner::Worker.connection, "backburner.worker.queue.integration-tests") # backburner dasherizes the queue name + end + + def can_run? + begin + Backburner::Worker.connection.send :connect! + rescue + return false + end + true + end +end + diff --git a/activejob/test/support/integration/adapters/delayed_job.rb b/activejob/test/support/integration/adapters/delayed_job.rb new file mode 100644 index 0000000000..0b591964bc --- /dev/null +++ b/activejob/test/support/integration/adapters/delayed_job.rb @@ -0,0 +1,20 @@ +require 'delayed_job' +require 'delayed_job_active_record' + +module DelayedJobJobsManager + def setup + ActiveJob::Base.queue_adapter = :delayed_job + end + def clear_jobs + Delayed::Job.delete_all + end + + def start_workers + @worker = Delayed::Worker.new(quiet: true, sleep_delay: 0.5, queues: %w(integration_tests)) + @thread = Thread.new { @worker.start } + end + + def stop_workers + @worker.stop + end +end diff --git a/activejob/test/support/integration/adapters/inline.rb b/activejob/test/support/integration/adapters/inline.rb new file mode 100644 index 0000000000..83c38f706f --- /dev/null +++ b/activejob/test/support/integration/adapters/inline.rb @@ -0,0 +1,15 @@ +module InlineJobsManager + def setup + ActiveJob::Base.queue_adapter = :inline + end + + def clear_jobs + end + + def start_workers + end + + def stop_workers + end +end + diff --git a/activejob/test/support/integration/adapters/qu.rb b/activejob/test/support/integration/adapters/qu.rb new file mode 100644 index 0000000000..3a5b66a057 --- /dev/null +++ b/activejob/test/support/integration/adapters/qu.rb @@ -0,0 +1,38 @@ +module QuJobsManager + def setup + require 'qu-rails' + require 'qu-redis' + ActiveJob::Base.queue_adapter = :qu + ENV['REDISTOGO_URL'] = "tcp://127.0.0.1:6379/12" + backend = Qu::Backend::Redis.new + backend.namespace = "active_jobs_int_test" + Qu.backend = backend + Qu.logger = Rails.logger + Qu.interval = 0.5 + unless can_run? + puts "Cannot run integration tests for qu. To be able to run integration tests for qu you need to install and start redis.\n" + exit + end + end + + def clear_jobs + Qu.clear "integration_tests" + end + + def start_workers + @thread = Thread.new { Qu::Worker.new("integration_tests").start } + end + + def stop_workers + @thread.kill + end + + def can_run? + begin + Qu.backend.connection.client.connect + rescue + return false + end + true + end +end diff --git a/activejob/test/support/integration/adapters/que.rb b/activejob/test/support/integration/adapters/que.rb new file mode 100644 index 0000000000..ba7657a42a --- /dev/null +++ b/activejob/test/support/integration/adapters/que.rb @@ -0,0 +1,37 @@ +module QueJobsManager + def setup + require 'sequel' + ActiveJob::Base.queue_adapter = :que + que_url = ENV['QUE_DATABASE_URL'] || 'postgres:///active_jobs_que_int_test' + uri = URI.parse(que_url) + user = uri.user||ENV['USER'] + pass = uri.password + db = uri.path[1..-1] + %x{#{"PGPASSWORD=\"#{pass}\"" if pass} psql -c 'drop database if exists "#{db}"' -U #{user} -t template1} + %x{#{"PGPASSWORD=\"#{pass}\"" if pass} psql -c 'create database "#{db}"' -U #{user} -t template1} + Que.connection = Sequel.connect(que_url) + Que.migrate! + Que.mode = :off + Que.worker_count = 1 + rescue Sequel::DatabaseConnectionError + puts "Cannot run integration tests for que. To be able to run integration tests for que you need to install and start postgresql.\n" + exit + end + + def clear_jobs + Que.clear! + end + + def start_workers + @thread = Thread.new do + loop do + Que::Job.work("integration_tests") + sleep 0.5 + end + end + end + + def stop_workers + @thread.kill + end +end diff --git a/activejob/test/support/integration/adapters/queue_classic.rb b/activejob/test/support/integration/adapters/queue_classic.rb new file mode 100644 index 0000000000..038473ccdc --- /dev/null +++ b/activejob/test/support/integration/adapters/queue_classic.rb @@ -0,0 +1,33 @@ +module QueueClassicJobsManager + def setup + ENV['QC_DATABASE_URL'] ||= 'postgres:///active_jobs_qc_int_test' + ENV['QC_LISTEN_TIME'] = "0.5" + uri = URI.parse(ENV['QC_DATABASE_URL']) + user = uri.user||ENV['USER'] + pass = uri.password + db = uri.path[1..-1] + %x{#{"PGPASSWORD=\"#{pass}\"" if pass} psql -c 'drop database if exists "#{db}"' -U #{user} -t template1} + %x{#{"PGPASSWORD=\"#{pass}\"" if pass} psql -c 'create database "#{db}"' -U #{user} -t template1} + ActiveJob::Base.queue_adapter = :queue_classic + QC::Setup.create + rescue PG::ConnectionBad + puts "Cannot run integration tests for queue_classic. To be able to run integration tests for queue_classic you need to install and start postgresql.\n" + exit + end + + def clear_jobs + QC::Queue.new("integration_tests").delete_all + end + + def start_workers + QC::Conn.disconnect + @pid = fork do + worker = QC::Worker.new(q_name: 'integration_tests') + worker.start + end + end + + def stop_workers + Process.kill 'HUP', @pid + end +end diff --git a/activejob/test/support/integration/adapters/resque.rb b/activejob/test/support/integration/adapters/resque.rb new file mode 100644 index 0000000000..912f4bc387 --- /dev/null +++ b/activejob/test/support/integration/adapters/resque.rb @@ -0,0 +1,49 @@ +module ResqueJobsManager + def setup + ActiveJob::Base.queue_adapter = :resque + Resque.redis = Redis::Namespace.new 'active_jobs_int_test', redis: Redis.connect(url: "redis://127.0.0.1:6379/12", :thread_safe => true) + Resque.logger = Rails.logger + unless can_run? + puts "Cannot run integration tests for resque. To be able to run integration tests for resque you need to install and start redis.\n" + exit + end + end + + def clear_jobs + Resque.queues.each { |queue_name| Resque.redis.del "queue:#{queue_name}" } + Resque.redis.keys("delayed:*").each { |key| Resque.redis.del "#{key}" } + Resque.redis.del "delayed_queue_schedule" + end + + def start_workers + @resque_thread = Thread.new do + w = Resque::Worker.new("integration_tests") + w.term_child = true + w.work(0.5) + end + @scheduler_thread = Thread.new do + Resque::Scheduler.configure do |c| + c.poll_sleep_amount = 0.5 + c.dynamic = true + c.quiet = true + c.logfile = nil + end + Resque::Scheduler.master_lock.release! + Resque::Scheduler.run + end + end + + def stop_workers + @resque_thread.kill + @scheduler_thread.kill + end + + def can_run? + begin + Resque.redis.client.connect + rescue + return false + end + true + end +end diff --git a/activejob/test/support/integration/adapters/sidekiq.rb b/activejob/test/support/integration/adapters/sidekiq.rb new file mode 100644 index 0000000000..6ff18fb56a --- /dev/null +++ b/activejob/test/support/integration/adapters/sidekiq.rb @@ -0,0 +1,58 @@ +require 'sidekiq/cli' +require 'sidekiq/api' + +module SidekiqJobsManager + + def setup + ActiveJob::Base.queue_adapter = :sidekiq + unless can_run? + puts "Cannot run integration tests for sidekiq. To be able to run integration tests for sidekiq you need to install and start redis.\n" + exit + end + end + + def clear_jobs + Sidekiq::ScheduledSet.new.clear + Sidekiq::Queue.new("integration_tests").clear + end + + def start_workers + fork do + sidekiq = Sidekiq::CLI.instance + logfile = Rails.root.join("log/sidekiq.log").to_s + pidfile = Rails.root.join("tmp/sidekiq.pid").to_s + sidekiq.parse([ "--require", Rails.root.to_s, + "--queue", "integration_tests", + "--logfile", logfile, + "--pidfile", pidfile, + "--environment", "test", + "--concurrency", "1", + "--timeout", "1", + "--daemon", + ]) + require 'celluloid' + require 'sidekiq/scheduled' + Sidekiq.poll_interval = 0.5 + Sidekiq::Scheduled.const_set :INITIAL_WAIT, 1 + sidekiq.run + end + sleep 1 + end + + def stop_workers + pidfile = Rails.root.join("tmp/sidekiq.pid").to_s + Process.kill 'TERM', File.open(pidfile).read.to_i + FileUtils.rm_f pidfile + rescue + end + + def can_run? + begin + Sidekiq.redis(&:info) + Sidekiq.logger = nil + rescue + return false + end + true + end +end diff --git a/activejob/test/support/integration/adapters/sneakers.rb b/activejob/test/support/integration/adapters/sneakers.rb new file mode 100644 index 0000000000..875803a2d8 --- /dev/null +++ b/activejob/test/support/integration/adapters/sneakers.rb @@ -0,0 +1,90 @@ +require 'sneakers/runner' +require 'sneakers/publisher' +require 'timeout' + +module Sneakers + class Publisher + def safe_ensure_connected + @mutex.synchronize do + ensure_connection! unless connected? + end + end + end +end + + +module SneakersJobsManager + def setup + ActiveJob::Base.queue_adapter = :sneakers + Sneakers.configure :heartbeat => 2, + :amqp => 'amqp://guest:guest@localhost:5672', + :vhost => '/', + :exchange => 'active_jobs_sneakers_int_test', + :exchange_type => :direct, + :daemonize => true, + :threads => 1, + :workers => 1, + :pid_path => Rails.root.join("tmp/sneakers.pid").to_s, + :log => Rails.root.join("log/sneakers.log").to_s + unless can_run? + puts "Cannot run integration tests for sneakers. To be able to run integration tests for sneakers you need to install and start rabbitmq.\n" + exit + end + end + + def clear_jobs + bunny_queue.purge + end + + def start_workers + @pid = fork do + queues = %w(integration_tests) + workers = queues.map do |q| + worker_klass = "ActiveJobWorker"+Digest::MD5.hexdigest(q) + Sneakers.const_set(worker_klass, Class.new(ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper) do + from_queue q + end) + end + Sneakers::Runner.new(workers).run + end + begin + Timeout.timeout(10) do + while bunny_queue.status[:consumer_count] == 0 + sleep 0.5 + end + end + rescue Timeout::Error + stop_workers + raise "Failed to start sneakers worker" + end + end + + def stop_workers + Process.kill 'TERM', @pid + Process.kill 'TERM', File.open(Rails.root.join("tmp/sneakers.pid").to_s).read.to_i + rescue + end + + def can_run? + begin + bunny_publisher + rescue + return false + end + true + end + + protected + def bunny_publisher + @bunny_publisher ||= begin + p = ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper.send(:publisher) + p.safe_ensure_connected + p + end + end + + def bunny_queue + @queue ||= bunny_publisher.exchange.channel.queue "integration_tests", durable: true + end + +end diff --git a/activejob/test/support/integration/adapters/sucker_punch.rb b/activejob/test/support/integration/adapters/sucker_punch.rb new file mode 100644 index 0000000000..9c0d66b469 --- /dev/null +++ b/activejob/test/support/integration/adapters/sucker_punch.rb @@ -0,0 +1,6 @@ +module SuckerPunchJobsManager + def setup + ActiveJob::Base.queue_adapter = :sucker_punch + SuckerPunch.logger = nil + end +end diff --git a/activejob/test/support/integration/dummy_app_template.rb b/activejob/test/support/integration/dummy_app_template.rb new file mode 100644 index 0000000000..65994d6a1c --- /dev/null +++ b/activejob/test/support/integration/dummy_app_template.rb @@ -0,0 +1,21 @@ +if ENV['AJADAPTER'] == 'delayed_job' + generate "delayed_job:active_record", "--quiet" + rake("db:migrate") +end + +initializer 'activejob.rb', <<-CODE +require "#{File.expand_path("../jobs_manager.rb", __FILE__)}" +JobsManager.current_manager.setup +CODE + +file 'app/jobs/test_job.rb', <<-CODE +class TestJob < ActiveJob::Base + queue_as :integration_tests + + def perform(x) + File.open(Rails.root.join("tmp/\#{x}"), "w+") do |f| + f.write x + end + end +end +CODE diff --git a/activejob/test/support/integration/helper.rb b/activejob/test/support/integration/helper.rb new file mode 100644 index 0000000000..9bd45e09e8 --- /dev/null +++ b/activejob/test/support/integration/helper.rb @@ -0,0 +1,30 @@ +puts "*** rake aj:integration:#{ENV['AJADAPTER']} ***\n" + +ENV["RAILS_ENV"] = "test" +ActiveJob::Base.queue_name_prefix = nil + +require 'rails/generators/rails/app/app_generator' + +dummy_app_path = Dir.mktmpdir + "/dummy" +dummy_app_template = File.expand_path("../dummy_app_template.rb", __FILE__) +args = Rails::Generators::ARGVScrubber.new(["new", dummy_app_path, "--skip-gemfile", "--skip-bundle", + "--skip-git", "--skip-spring", "-d", "sqlite3", "--skip-javascript", "--force", "--quiet", + "--template", dummy_app_template]).prepare! +Rails::Generators::AppGenerator.start args + +require "#{dummy_app_path}/config/environment.rb" + +ActiveRecord::Migrator.migrations_paths = [ Rails.root.join('db/migrate').to_s ] +require 'rails/test_help' + +Rails.backtrace_cleaner.remove_silencers! + +require_relative 'test_case_helpers' +ActiveSupport::TestCase.send(:include, TestCaseHelpers) + +JobsManager.current_manager.start_workers + +Minitest.after_run do + JobsManager.current_manager.stop_workers + JobsManager.current_manager.clear_jobs +end diff --git a/activejob/test/support/integration/jobs_manager.rb b/activejob/test/support/integration/jobs_manager.rb new file mode 100644 index 0000000000..4df34aaeb1 --- /dev/null +++ b/activejob/test/support/integration/jobs_manager.rb @@ -0,0 +1,27 @@ +class JobsManager + @@managers = {} + attr :adapter_name + + def self.current_manager + @@managers[ENV['AJADAPTER']] ||= new(ENV['AJADAPTER']) + end + + def initialize(adapter_name) + @adapter_name = adapter_name + require_relative "adapters/#{adapter_name}" + extend "#{adapter_name.camelize}JobsManager".constantize + end + + def setup + ActiveJob::Base.queue_adapter = nil + end + + def clear_jobs + end + + def start_workers + end + + def stop_workers + end +end diff --git a/activejob/test/support/integration/test_case_helpers.rb b/activejob/test/support/integration/test_case_helpers.rb new file mode 100644 index 0000000000..ee2f6aebea --- /dev/null +++ b/activejob/test/support/integration/test_case_helpers.rb @@ -0,0 +1,48 @@ +require 'active_support/concern' +require 'support/integration/jobs_manager' + +module TestCaseHelpers + extend ActiveSupport::Concern + + included do + self.use_transactional_fixtures = false + + setup do + clear_jobs + @id = "AJ-#{SecureRandom.uuid}" + end + + teardown do + clear_jobs + end + end + + protected + + def jobs_manager + JobsManager.current_manager + end + + def clear_jobs + jobs_manager.clear_jobs + end + + def adapter_is?(adapter) + ActiveJob::Base.queue_adapter.name.split("::").last.gsub(/Adapter$/, '').underscore==adapter.to_s + end + + def wait_for_jobs_to_finish_for(seconds=60) + begin + Timeout.timeout(seconds) do + while !job_executed do + sleep 0.25 + end + end + rescue Timeout::Error + end + end + + def job_executed + Dummy::Application.root.join("tmp/#{@id}").exist? + end +end diff --git a/activejob/test/support/queue_classic/inline.rb b/activejob/test/support/queue_classic/inline.rb index 5e9c295e01..5743d5bbb5 100644 --- a/activejob/test/support/queue_classic/inline.rb +++ b/activejob/test/support/queue_classic/inline.rb @@ -7,5 +7,17 @@ module QC receiver = eval(receiver_str) receiver.send(message, *args) end + + def enqueue_in(seconds, method, *args) + receiver_str, _, message = method.rpartition('.') + receiver = eval(receiver_str) + receiver.send(message, *args) + end + + def enqueue_at(not_before, method, *args) + receiver_str, _, message = method.rpartition('.') + receiver = eval(receiver_str) + receiver.send(message, *args) + end end end |