diff options
Diffstat (limited to 'activejob')
23 files changed, 477 insertions, 457 deletions
diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md index 8687af5eba..efe46ce5ab 100644 --- a/activejob/CHANGELOG.md +++ b/activejob/CHANGELOG.md @@ -1,3 +1,29 @@ +* Enable class reloading prior to job dispatch, and ensure Active Record + connections are returned to the pool when jobs are run in separate threads. + + *Matthew Draper* + +* Tune the async adapter for low-footprint dev/test usage. Use a single + thread pool for all queues and limit to 0 to #CPU total threads, down from + 2 to 10*#CPU per queue. + + *Jeremy Daer* + + +## Rails 5.0.0.beta3 (February 24, 2016) ## + +* Change the default adapter from inline to async. It's a better default as tests will then not mistakenly + come to rely on behavior happening synchronously. This is especially important with things like jobs kicked off + in Active Record lifecycle callbacks. + + *DHH* + + +## Rails 5.0.0.beta2 (February 01, 2016) ## + +* No changes. + + ## Rails 5.0.0.beta1 (December 18, 2015) ## * Fixed serializing `:at` option for `assert_enqueued_with` diff --git a/activejob/README.md b/activejob/README.md index 7268186c00..8becac7753 100644 --- a/activejob/README.md +++ b/activejob/README.md @@ -20,12 +20,7 @@ switch between them without having to rewrite your jobs. ## Usage -Set the queue adapter for Active Job: - -``` ruby -ActiveJob::Base.queue_adapter = :inline # default queue adapter -``` -Note: To learn how to use your preferred queueing backend see its adapter +To learn how to use your preferred queueing backend see its adapter documentation at [ActiveJob::QueueAdapters](http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html). diff --git a/activejob/Rakefile b/activejob/Rakefile index d9648a7f16..2a853b4b6b 100644 --- a/activejob/Rakefile +++ b/activejob/Rakefile @@ -6,6 +6,9 @@ ACTIVEJOB_ADAPTERS -= %w(queue_classic) if defined?(JRUBY_VERSION) task default: :test task test: 'test:default' +task :package +task "package:clean" + namespace :test do desc 'Run all adapter tests' task :default do diff --git a/activejob/activejob.gemspec b/activejob/activejob.gemspec index bc1671b508..e97bb40abf 100644 --- a/activejob/activejob.gemspec +++ b/activejob/activejob.gemspec @@ -13,7 +13,7 @@ Gem::Specification.new do |s| s.author = 'David Heinemeier Hansson' s.email = 'david@loudthinking.com' - s.homepage = 'http://www.rubyonrails.org' + s.homepage = 'http://rubyonrails.org' s.files = Dir['CHANGELOG.md', 'MIT-LICENSE', 'README.md', 'lib/**/*'] s.require_path = 'lib' diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb index f7e05f7cd3..4b9065cf50 100644 --- a/activejob/lib/active_job.rb +++ b/activejob/lib/active_job.rb @@ -32,7 +32,6 @@ module ActiveJob autoload :Base autoload :QueueAdapters autoload :ConfiguredJob - autoload :AsyncJob autoload :TestCase autoload :TestHelper end diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb deleted file mode 100644 index ed7a6e8d9b..0000000000 --- a/activejob/lib/active_job/async_job.rb +++ /dev/null @@ -1,77 +0,0 @@ -require 'concurrent/map' -require 'concurrent/scheduled_task' -require 'concurrent/executor/thread_pool_executor' -require 'concurrent/utility/processor_counter' - -module ActiveJob - # == Active Job Async Job - # - # When enqueueing jobs with Async Job each job will be executed asynchronously - # on a +concurrent-ruby+ thread pool. All job data is retained in memory. - # Because job data is not saved to a persistent datastore there is no - # additional infrastructure needed and jobs process quickly. The lack of - # persistence, however, means that all unprocessed jobs will be lost on - # application restart. Therefore in-memory queue adapters are unsuitable for - # most production environments but are excellent for development and testing. - # - # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby]. - # - # To use Async Job set the queue_adapter config to +:async+. - # - # Rails.application.config.active_job.queue_adapter = :async - # - # Async Job supports job queues specified with +queue_as+. Queues are created - # automatically as needed and each has its own thread pool. - class AsyncJob - - DEFAULT_EXECUTOR_OPTIONS = { - min_threads: [2, Concurrent.processor_count].max, - max_threads: Concurrent.processor_count * 10, - auto_terminate: true, - idletime: 60, # 1 minute - max_queue: 0, # unlimited - fallback_policy: :caller_runs # shouldn't matter -- 0 max queue - }.freeze - - QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc: - hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool } - end - - class << self - # Forces jobs to process immediately when testing the Active Job gem. - # This should only be called from within unit tests. - def perform_immediately! #:nodoc: - @perform_immediately = true - end - - # Allows jobs to run asynchronously when testing the Active Job gem. - # This should only be called from within unit tests. - def perform_asynchronously! #:nodoc: - @perform_immediately = false - end - - def create_thread_pool #:nodoc: - if @perform_immediately - Concurrent::ImmediateExecutor.new - else - Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS) - end - end - - def enqueue(job_data, queue: 'default') #:nodoc: - QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) } - end - - def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc: - delay = timestamp - Time.current.to_f - if delay > 0 - Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job| - ActiveJob::Base.execute(job) - end - else - enqueue(job_data, queue: queue) - end - end - end - end -end diff --git a/activejob/lib/active_job/callbacks.rb b/activejob/lib/active_job/callbacks.rb index 2b6149e84e..a6591c6a05 100644 --- a/activejob/lib/active_job/callbacks.rb +++ b/activejob/lib/active_job/callbacks.rb @@ -17,6 +17,11 @@ module ActiveJob extend ActiveSupport::Concern include ActiveSupport::Callbacks + class << self + include ActiveSupport::Callbacks + define_callbacks :execute + end + included do define_callbacks :perform define_callbacks :enqueue diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb index 19b900a285..f7f882c998 100644 --- a/activejob/lib/active_job/core.rb +++ b/activejob/lib/active_job/core.rb @@ -79,7 +79,7 @@ module ActiveJob 'queue_name' => queue_name, 'priority' => priority, 'arguments' => serialize_arguments(arguments), - 'locale' => I18n.locale + 'locale' => I18n.locale.to_s } end @@ -108,7 +108,7 @@ module ActiveJob self.queue_name = job_data['queue_name'] self.priority = job_data['priority'] self.serialized_arguments = job_data['arguments'] - self.locale = job_data['locale'] || I18n.locale + self.locale = job_data['locale'] || I18n.locale.to_s end private diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb index 79d232da4a..7c4151fc90 100644 --- a/activejob/lib/active_job/execution.rb +++ b/activejob/lib/active_job/execution.rb @@ -17,8 +17,10 @@ module ActiveJob end def execute(job_data) #:nodoc: - job = deserialize(job_data) - job.perform_now + ActiveJob::Callbacks.run_callbacks(:execute) do + job = deserialize(job_data) + job.perform_now + end end end diff --git a/activejob/lib/active_job/gem_version.rb b/activejob/lib/active_job/gem_version.rb index 0bdeca76a8..be4fabf545 100644 --- a/activejob/lib/active_job/gem_version.rb +++ b/activejob/lib/active_job/gem_version.rb @@ -8,7 +8,7 @@ module ActiveJob MAJOR = 5 MINOR = 0 TINY = 0 - PRE = "beta1.1" + PRE = "beta3" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb index 457015b741..72e4ebf935 100644 --- a/activejob/lib/active_job/queue_adapter.rb +++ b/activejob/lib/active_job/queue_adapter.rb @@ -4,25 +4,25 @@ require 'active_support/core_ext/string/inflections' module ActiveJob # The <tt>ActiveJob::QueueAdapter</tt> module is used to load the - # correct adapter. The default queue adapter is the +:inline+ queue. + # correct adapter. The default queue adapter is the +:async+ queue. module QueueAdapter #:nodoc: extend ActiveSupport::Concern included do class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false - self.queue_adapter = :inline + self.queue_adapter = :async end # Includes the setter method for changing the active queue adapter. module ClassMethods # Returns the backend queue provider. The default queue adapter - # is the +:inline+ queue. See QueueAdapters for more information. + # is the +:async+ queue. See QueueAdapters for more information. def queue_adapter _queue_adapter end # Specify the backend queue provider. The default queue adapter - # is the +:inline+ queue. See QueueAdapters for more + # is the +:async+ queue. See QueueAdapters for more # information. def queue_adapter=(name_or_adapter_or_class) self._queue_adapter = interpret_adapter(name_or_adapter_or_class) diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb index 3fc27f56e7..922bc4afce 100644 --- a/activejob/lib/active_job/queue_adapters/async_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb @@ -1,22 +1,113 @@ -require 'active_job/async_job' +require 'securerandom' +require 'concurrent/scheduled_task' +require 'concurrent/executor/thread_pool_executor' +require 'concurrent/utility/processor_counter' module ActiveJob module QueueAdapters # == Active Job Async adapter # - # When enqueueing jobs with the Async adapter the job will be executed - # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html]. + # The Async adapter runs jobs with an in-process thread pool. # - # To use +AsyncJob+ set the queue_adapter config to +:async+. + # This is the default queue adapter. It's well-suited for dev/test since + # it doesn't need an external infrastructure, but it's a poor fit for + # production since it drops pending jobs on restart. # - # Rails.application.config.active_job.queue_adapter = :async + # To use this adapter, set queue adapter to +:async+: + # + # config.active_job.queue_adapter = :async + # + # To configure the adapter's thread pool, instantiate the adapter and + # pass your own config: + # + # config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \ + # min_threads: 1, + # max_threads: 2 * Concurrent.processor_count, + # idletime: 600.seconds + # + # The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute + # jobs. Since jobs share a single thread pool, long-running jobs will block + # short-lived jobs. Fine for dev/test; bad for production. class AsyncAdapter + # See {Concurrent::ThreadPoolExecutor}[http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options. + def initialize(**executor_options) + @scheduler = Scheduler.new(**executor_options) + end + def enqueue(job) #:nodoc: - ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name) + @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name end def enqueue_at(job, timestamp) #:nodoc: - ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name) + @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name + end + + # Gracefully stop processing jobs. Finishes in-progress work and handles + # any new jobs following the executor's fallback policy (`caller_runs`). + # Waits for termination by default. Pass `wait: false` to continue. + def shutdown(wait: true) #:nodoc: + @scheduler.shutdown wait: wait + end + + # Used for our test suite. + def immediate=(immediate) #:nodoc: + @scheduler.immediate = immediate + end + + # Note that we don't actually need to serialize the jobs since we're + # performing them in-process, but we do so anyway for parity with other + # adapters and deployment environments. Otherwise, serialization bugs + # may creep in undetected. + class JobWrapper #:nodoc: + def initialize(job) + job.provider_job_id = SecureRandom.uuid + @job_data = job.serialize + end + + def perform + Base.execute @job_data + end + end + + class Scheduler #:nodoc: + DEFAULT_EXECUTOR_OPTIONS = { + min_threads: 0, + max_threads: Concurrent.processor_count, + auto_terminate: true, + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + }.freeze + + attr_accessor :immediate + + def initialize(**options) + self.immediate = false + @immediate_executor = Concurrent::ImmediateExecutor.new + @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options)) + end + + def enqueue(job, queue_name:) + executor.post(job, &:perform) + end + + def enqueue_at(job, timestamp, queue_name:) + delay = timestamp - Time.current.to_f + if delay > 0 + Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform) + else + enqueue(job, queue_name: queue_name) + end + end + + def shutdown(wait: true) + @async_executor.shutdown + @async_executor.wait_for_termination if wait + end + + def executor + immediate ? @immediate_executor : @async_executor + end end end end diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb index 8ad5f4de07..0496f8449e 100644 --- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb @@ -2,7 +2,7 @@ module ActiveJob module QueueAdapters # == Active Job Inline adapter # - # When enqueueing jobs with the Inline adapter the job will be executed + # When enqueuing jobs with the Inline adapter the job will be executed # immediately. # # To use the Inline set the queue_adapter config to +:inline+. diff --git a/activejob/lib/active_job/railtie.rb b/activejob/lib/active_job/railtie.rb index 6538ac1b30..a47caa4a7e 100644 --- a/activejob/lib/active_job/railtie.rb +++ b/activejob/lib/active_job/railtie.rb @@ -12,12 +12,21 @@ module ActiveJob initializer "active_job.set_configs" do |app| options = app.config.active_job - options.queue_adapter ||= :inline + options.queue_adapter ||= :async ActiveSupport.on_load(:active_job) do options.each { |k,v| send("#{k}=", v) } end end + initializer "active_job.set_reloader_hook" do |app| + ActiveSupport.on_load(:active_job) do + ActiveJob::Callbacks.singleton_class.set_callback(:execute, :around, prepend: true) do |_, inner| + app.reloader.wrap do + inner.call + end + end + end + end end end diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb index 44ddfa5f69..e06b98736d 100644 --- a/activejob/lib/active_job/test_helper.rb +++ b/activejob/lib/active_job/test_helper.rb @@ -4,325 +4,321 @@ require 'active_support/core_ext/hash/keys' module ActiveJob # Provides helper methods for testing Active Job module TestHelper - extend ActiveSupport::Concern + delegate :enqueued_jobs, :enqueued_jobs=, + :performed_jobs, :performed_jobs=, + to: :queue_adapter - included do - def before_setup # :nodoc: - test_adapter = ActiveJob::QueueAdapters::TestAdapter.new + def before_setup # :nodoc: + test_adapter = ActiveJob::QueueAdapters::TestAdapter.new - @old_queue_adapters = (ActiveJob::Base.subclasses << ActiveJob::Base).select do |klass| - # only override explicitly set adapters, a quirk of `class_attribute` - klass.singleton_class.public_instance_methods(false).include?(:_queue_adapter) - end.map do |klass| - [klass, klass.queue_adapter].tap do - klass.queue_adapter = test_adapter - end + @old_queue_adapters = (ActiveJob::Base.subclasses << ActiveJob::Base).select do |klass| + # only override explicitly set adapters, a quirk of `class_attribute` + klass.singleton_class.public_instance_methods(false).include?(:_queue_adapter) + end.map do |klass| + [klass, klass.queue_adapter].tap do + klass.queue_adapter = test_adapter end - - clear_enqueued_jobs - clear_performed_jobs - super end - def after_teardown # :nodoc: - super - @old_queue_adapters.each do |(klass, adapter)| - klass.queue_adapter = adapter - end - end + clear_enqueued_jobs + clear_performed_jobs + super + end - # Asserts that the number of enqueued jobs matches the given number. - # - # def test_jobs - # assert_enqueued_jobs 0 - # HelloJob.perform_later('david') - # assert_enqueued_jobs 1 - # HelloJob.perform_later('abdelkader') - # assert_enqueued_jobs 2 - # end - # - # If a block is passed, that block should cause the specified number of - # jobs to be enqueued. - # - # def test_jobs_again - # assert_enqueued_jobs 1 do - # HelloJob.perform_later('cristian') - # end - # - # assert_enqueued_jobs 2 do - # HelloJob.perform_later('aaron') - # HelloJob.perform_later('rafael') - # end - # end - # - # The number of times a specific job is enqueued can be asserted. - # - # def test_logging_job - # assert_enqueued_jobs 2, only: LoggingJob do - # LoggingJob.perform_later - # HelloJob.perform_later('jeremy') - # end - # end - def assert_enqueued_jobs(number, only: nil) - if block_given? - original_count = enqueued_jobs_size(only: only) - yield - new_count = enqueued_jobs_size(only: only) - assert_equal number, new_count - original_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued" - else - actual_count = enqueued_jobs_size(only: only) - assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued" - end + def after_teardown # :nodoc: + super + @old_queue_adapters.each do |(klass, adapter)| + klass.queue_adapter = adapter end + end - # Asserts that no jobs have been enqueued. - # - # def test_jobs - # assert_no_enqueued_jobs - # HelloJob.perform_later('jeremy') - # assert_enqueued_jobs 1 - # end - # - # If a block is passed, that block should not cause any job to be enqueued. - # - # def test_jobs_again - # assert_no_enqueued_jobs do - # # No job should be enqueued from this block - # end - # end - # - # It can be asserted that no jobs of a specific kind are enqueued: - # - # def test_no_logging - # assert_no_enqueued_jobs only: LoggingJob do - # HelloJob.perform_later('jeremy') - # end - # end - # - # Note: This assertion is simply a shortcut for: - # - # assert_enqueued_jobs 0, &block - def assert_no_enqueued_jobs(only: nil, &block) - assert_enqueued_jobs 0, only: only, &block + # Asserts that the number of enqueued jobs matches the given number. + # + # def test_jobs + # assert_enqueued_jobs 0 + # HelloJob.perform_later('david') + # assert_enqueued_jobs 1 + # HelloJob.perform_later('abdelkader') + # assert_enqueued_jobs 2 + # end + # + # If a block is passed, that block should cause the specified number of + # jobs to be enqueued. + # + # def test_jobs_again + # assert_enqueued_jobs 1 do + # HelloJob.perform_later('cristian') + # end + # + # assert_enqueued_jobs 2 do + # HelloJob.perform_later('aaron') + # HelloJob.perform_later('rafael') + # end + # end + # + # The number of times a specific job is enqueued can be asserted. + # + # def test_logging_job + # assert_enqueued_jobs 1, only: LoggingJob do + # LoggingJob.perform_later + # HelloJob.perform_later('jeremy') + # end + # end + def assert_enqueued_jobs(number, only: nil) + if block_given? + original_count = enqueued_jobs_size(only: only) + yield + new_count = enqueued_jobs_size(only: only) + assert_equal number, new_count - original_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued" + else + actual_count = enqueued_jobs_size(only: only) + assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued" end + end - # Asserts that the number of performed jobs matches the given number. - # If no block is passed, <tt>perform_enqueued_jobs</tt> - # must be called around the job call. - # - # def test_jobs - # assert_performed_jobs 0 - # - # perform_enqueued_jobs do - # HelloJob.perform_later('xavier') - # end - # assert_performed_jobs 1 - # - # perform_enqueued_jobs do - # HelloJob.perform_later('yves') - # assert_performed_jobs 2 - # end - # end - # - # If a block is passed, that block should cause the specified number of - # jobs to be performed. - # - # def test_jobs_again - # assert_performed_jobs 1 do - # HelloJob.perform_later('robin') - # end - # - # assert_performed_jobs 2 do - # HelloJob.perform_later('carlos') - # HelloJob.perform_later('sean') - # end - # end - # - # The block form supports filtering. If the :only option is specified, - # then only the listed job(s) will be performed. - # - # def test_hello_job - # assert_performed_jobs 1, only: HelloJob do - # HelloJob.perform_later('jeremy') - # LoggingJob.perform_later - # end - # end - # - # An array may also be specified, to support testing multiple jobs. - # - # def test_hello_and_logging_jobs - # assert_nothing_raised do - # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do - # HelloJob.perform_later('jeremy') - # LoggingJob.perform_later('stewie') - # RescueJob.perform_later('david') - # end - # end - # end - def assert_performed_jobs(number, only: nil) - if block_given? - original_count = performed_jobs.size - perform_enqueued_jobs(only: only) { yield } - new_count = performed_jobs.size - assert_equal number, new_count - original_count, - "#{number} jobs expected, but #{new_count - original_count} were performed" - else - performed_jobs_size = performed_jobs.size - assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed" - end - end + # Asserts that no jobs have been enqueued. + # + # def test_jobs + # assert_no_enqueued_jobs + # HelloJob.perform_later('jeremy') + # assert_enqueued_jobs 1 + # end + # + # If a block is passed, that block should not cause any job to be enqueued. + # + # def test_jobs_again + # assert_no_enqueued_jobs do + # # No job should be enqueued from this block + # end + # end + # + # It can be asserted that no jobs of a specific kind are enqueued: + # + # def test_no_logging + # assert_no_enqueued_jobs only: LoggingJob do + # HelloJob.perform_later('jeremy') + # end + # end + # + # Note: This assertion is simply a shortcut for: + # + # assert_enqueued_jobs 0, &block + def assert_no_enqueued_jobs(only: nil, &block) + assert_enqueued_jobs 0, only: only, &block + end - # Asserts that no jobs have been performed. - # - # def test_jobs - # assert_no_performed_jobs - # - # perform_enqueued_jobs do - # HelloJob.perform_later('matthew') - # assert_performed_jobs 1 - # end - # end - # - # If a block is passed, that block should not cause any job to be performed. - # - # def test_jobs_again - # assert_no_performed_jobs do - # # No job should be performed from this block - # end - # end - # - # The block form supports filtering. If the :only option is specified, - # then only the listed job(s) will be performed. - # - # def test_hello_job - # assert_performed_jobs 1, only: HelloJob do - # HelloJob.perform_later('jeremy') - # LoggingJob.perform_later - # end - # end - # - # An array may also be specified, to support testing multiple jobs. - # - # def test_hello_and_logging_jobs - # assert_nothing_raised do - # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do - # HelloJob.perform_later('jeremy') - # LoggingJob.perform_later('stewie') - # RescueJob.perform_later('david') - # end - # end - # end - # - # Note: This assertion is simply a shortcut for: - # - # assert_performed_jobs 0, &block - def assert_no_performed_jobs(only: nil, &block) - assert_performed_jobs 0, only: only, &block + # Asserts that the number of performed jobs matches the given number. + # If no block is passed, <tt>perform_enqueued_jobs</tt> + # must be called around the job call. + # + # def test_jobs + # assert_performed_jobs 0 + # + # perform_enqueued_jobs do + # HelloJob.perform_later('xavier') + # end + # assert_performed_jobs 1 + # + # perform_enqueued_jobs do + # HelloJob.perform_later('yves') + # assert_performed_jobs 2 + # end + # end + # + # If a block is passed, that block should cause the specified number of + # jobs to be performed. + # + # def test_jobs_again + # assert_performed_jobs 1 do + # HelloJob.perform_later('robin') + # end + # + # assert_performed_jobs 2 do + # HelloJob.perform_later('carlos') + # HelloJob.perform_later('sean') + # end + # end + # + # The block form supports filtering. If the :only option is specified, + # then only the listed job(s) will be performed. + # + # def test_hello_job + # assert_performed_jobs 1, only: HelloJob do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later + # end + # end + # + # An array may also be specified, to support testing multiple jobs. + # + # def test_hello_and_logging_jobs + # assert_nothing_raised do + # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later('stewie') + # RescueJob.perform_later('david') + # end + # end + # end + def assert_performed_jobs(number, only: nil) + if block_given? + original_count = performed_jobs.size + perform_enqueued_jobs(only: only) { yield } + new_count = performed_jobs.size + assert_equal number, new_count - original_count, + "#{number} jobs expected, but #{new_count - original_count} were performed" + else + performed_jobs_size = performed_jobs.size + assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed" end + end - # Asserts that the job passed in the block has been enqueued with the given arguments. - # - # def test_assert_enqueued_with - # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do - # MyJob.perform_later(1,2,3) - # end - # - # assert_enqueued_with(job: MyJob, at: Date.tomorrow.noon) do - # MyJob.set(wait_until: Date.tomorrow.noon).perform_later - # end - # end - def assert_enqueued_with(args = {}) - original_enqueued_jobs_count = enqueued_jobs.count - args.assert_valid_keys(:job, :args, :at, :queue) - serialized_args = serialize_args_for_assertion(args) - yield - in_block_jobs = enqueued_jobs.drop(original_enqueued_jobs_count) - matching_job = in_block_jobs.find do |job| - serialized_args.all? { |key, value| value == job[key] } - end - assert matching_job, "No enqueued job found with #{args}" - instantiate_job(matching_job) - end + # Asserts that no jobs have been performed. + # + # def test_jobs + # assert_no_performed_jobs + # + # perform_enqueued_jobs do + # HelloJob.perform_later('matthew') + # assert_performed_jobs 1 + # end + # end + # + # If a block is passed, that block should not cause any job to be performed. + # + # def test_jobs_again + # assert_no_performed_jobs do + # # No job should be performed from this block + # end + # end + # + # The block form supports filtering. If the :only option is specified, + # then only the listed job(s) will be performed. + # + # def test_hello_job + # assert_performed_jobs 1, only: HelloJob do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later + # end + # end + # + # An array may also be specified, to support testing multiple jobs. + # + # def test_hello_and_logging_jobs + # assert_nothing_raised do + # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later('stewie') + # RescueJob.perform_later('david') + # end + # end + # end + # + # Note: This assertion is simply a shortcut for: + # + # assert_performed_jobs 0, &block + def assert_no_performed_jobs(only: nil, &block) + assert_performed_jobs 0, only: only, &block + end - # Asserts that the job passed in the block has been performed with the given arguments. - # - # def test_assert_performed_with - # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do - # MyJob.perform_later(1,2,3) - # end - # - # assert_performed_with(job: MyJob, at: Date.tomorrow.noon) do - # MyJob.set(wait_until: Date.tomorrow.noon).perform_later - # end - # end - def assert_performed_with(args = {}) - original_performed_jobs_count = performed_jobs.count - args.assert_valid_keys(:job, :args, :at, :queue) - serialized_args = serialize_args_for_assertion(args) - perform_enqueued_jobs { yield } - in_block_jobs = performed_jobs.drop(original_performed_jobs_count) - matching_job = in_block_jobs.find do |job| - serialized_args.all? { |key, value| value == job[key] } - end - assert matching_job, "No performed job found with #{args}" - instantiate_job(matching_job) + # Asserts that the job passed in the block has been enqueued with the given arguments. + # + # def test_assert_enqueued_with + # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do + # MyJob.perform_later(1,2,3) + # end + # + # assert_enqueued_with(job: MyJob, at: Date.tomorrow.noon) do + # MyJob.set(wait_until: Date.tomorrow.noon).perform_later + # end + # end + def assert_enqueued_with(args = {}) + original_enqueued_jobs_count = enqueued_jobs.count + args.assert_valid_keys(:job, :args, :at, :queue) + serialized_args = serialize_args_for_assertion(args) + yield + in_block_jobs = enqueued_jobs.drop(original_enqueued_jobs_count) + matching_job = in_block_jobs.find do |job| + serialized_args.all? { |key, value| value == job[key] } end + assert matching_job, "No enqueued job found with #{args}" + instantiate_job(matching_job) + end - def perform_enqueued_jobs(only: nil) - old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs - old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs - old_filter = queue_adapter.filter - - begin - queue_adapter.perform_enqueued_jobs = true - queue_adapter.perform_enqueued_at_jobs = true - queue_adapter.filter = only - yield - ensure - queue_adapter.perform_enqueued_jobs = old_perform_enqueued_jobs - queue_adapter.perform_enqueued_at_jobs = old_perform_enqueued_at_jobs - queue_adapter.filter = old_filter - end + # Asserts that the job passed in the block has been performed with the given arguments. + # + # def test_assert_performed_with + # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do + # MyJob.perform_later(1,2,3) + # end + # + # assert_performed_with(job: MyJob, at: Date.tomorrow.noon) do + # MyJob.set(wait_until: Date.tomorrow.noon).perform_later + # end + # end + def assert_performed_with(args = {}) + original_performed_jobs_count = performed_jobs.count + args.assert_valid_keys(:job, :args, :at, :queue) + serialized_args = serialize_args_for_assertion(args) + perform_enqueued_jobs { yield } + in_block_jobs = performed_jobs.drop(original_performed_jobs_count) + matching_job = in_block_jobs.find do |job| + serialized_args.all? { |key, value| value == job[key] } end + assert matching_job, "No performed job found with #{args}" + instantiate_job(matching_job) + end + + def perform_enqueued_jobs(only: nil) + old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs + old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs + old_filter = queue_adapter.filter - def queue_adapter - ActiveJob::Base.queue_adapter + begin + queue_adapter.perform_enqueued_jobs = true + queue_adapter.perform_enqueued_at_jobs = true + queue_adapter.filter = only + yield + ensure + queue_adapter.perform_enqueued_jobs = old_perform_enqueued_jobs + queue_adapter.perform_enqueued_at_jobs = old_perform_enqueued_at_jobs + queue_adapter.filter = old_filter end + end - delegate :enqueued_jobs, :enqueued_jobs=, - :performed_jobs, :performed_jobs=, - to: :queue_adapter + def queue_adapter + ActiveJob::Base.queue_adapter + end - private - def clear_enqueued_jobs # :nodoc: - enqueued_jobs.clear - end + private + def clear_enqueued_jobs # :nodoc: + enqueued_jobs.clear + end - def clear_performed_jobs # :nodoc: - performed_jobs.clear - end + def clear_performed_jobs # :nodoc: + performed_jobs.clear + end - def enqueued_jobs_size(only: nil) # :nodoc: - if only - enqueued_jobs.count { |job| Array(only).include?(job.fetch(:job)) } - else - enqueued_jobs.count - end + def enqueued_jobs_size(only: nil) # :nodoc: + if only + enqueued_jobs.count { |job| Array(only).include?(job.fetch(:job)) } + else + enqueued_jobs.count end + end - def serialize_args_for_assertion(args) # :nodoc: - args.dup.tap do |serialized_args| - serialized_args[:args] = ActiveJob::Arguments.serialize(serialized_args[:args]) if serialized_args[:args] - serialized_args[:at] = serialized_args[:at].to_f if serialized_args[:at] - end + def serialize_args_for_assertion(args) # :nodoc: + args.dup.tap do |serialized_args| + serialized_args[:args] = ActiveJob::Arguments.serialize(serialized_args[:args]) if serialized_args[:args] + serialized_args[:at] = serialized_args[:at].to_f if serialized_args[:at] end + end - def instantiate_job(payload) # :nodoc: - job = payload[:job].new(*payload[:args]) - job.scheduled_at = Time.at(payload[:at]) if payload.key?(:at) - job.queue_name = payload[:queue] - job - end - end + def instantiate_job(payload) # :nodoc: + job = payload[:job].new(*payload[:args]) + job.scheduled_at = Time.at(payload[:at]) if payload.key?(:at) + job.queue_name = payload[:queue] + job + end end end diff --git a/activejob/test/adapters/async.rb b/activejob/test/adapters/async.rb index 5fcfb89566..08eb9658cd 100644 --- a/activejob/test/adapters/async.rb +++ b/activejob/test/adapters/async.rb @@ -1,4 +1,2 @@ -require 'active_job/async_job' - ActiveJob::Base.queue_adapter = :async -ActiveJob::AsyncJob.perform_immediately! +ActiveJob::Base.queue_adapter.immediate = true diff --git a/activejob/test/cases/argument_serialization_test.rb b/activejob/test/cases/argument_serialization_test.rb index eb8ad185aa..59dc3d7f78 100644 --- a/activejob/test/cases/argument_serialization_test.rb +++ b/activejob/test/cases/argument_serialization_test.rb @@ -62,13 +62,14 @@ class ArgumentSerializationTest < ActiveSupport::TestCase assert_raises ActiveJob::SerializationError do ActiveJob::Arguments.serialize [ { :a => [{ 2 => 3 }] } ] end + end - assert_raises ActiveJob::SerializationError do - ActiveJob::Arguments.serialize [ '_aj_globalid' => 1 ] - end - - assert_raises ActiveJob::SerializationError do - ActiveJob::Arguments.serialize [ :_aj_globalid => 1 ] + test 'should not allow reserved hash keys' do + ['_aj_globalid', :_aj_globalid, '_aj_symbol_keys', :_aj_symbol_keys, + '_aj_hash_with_indifferent_access', :_aj_hash_with_indifferent_access].each do |key| + assert_raises ActiveJob::SerializationError do + ActiveJob::Arguments.serialize [key => 1] + end end end diff --git a/activejob/test/cases/async_job_test.rb b/activejob/test/cases/async_job_test.rb deleted file mode 100644 index 2642cfc608..0000000000 --- a/activejob/test/cases/async_job_test.rb +++ /dev/null @@ -1,42 +0,0 @@ -require 'helper' -require 'jobs/hello_job' -require 'jobs/queue_as_job' - -class AsyncJobTest < ActiveSupport::TestCase - def using_async_adapter? - ActiveJob::Base.queue_adapter.is_a? ActiveJob::QueueAdapters::AsyncAdapter - end - - setup do - ActiveJob::AsyncJob.perform_asynchronously! - end - - teardown do - ActiveJob::AsyncJob::QUEUES.clear - ActiveJob::AsyncJob.perform_immediately! - end - - test "#create_thread_pool returns a thread_pool" do - thread_pool = ActiveJob::AsyncJob.create_thread_pool - assert thread_pool.is_a? Concurrent::ExecutorService - assert_not thread_pool.is_a? Concurrent::ImmediateExecutor - end - - test "#create_thread_pool returns an ImmediateExecutor after #perform_immediately! is called" do - ActiveJob::AsyncJob.perform_immediately! - thread_pool = ActiveJob::AsyncJob.create_thread_pool - assert thread_pool.is_a? Concurrent::ImmediateExecutor - end - - test "enqueuing without specifying a queue uses the default queue" do - skip unless using_async_adapter? - HelloJob.perform_later - assert ActiveJob::AsyncJob::QUEUES.key? 'default' - end - - test "enqueuing to a queue that does not exist creates the queue" do - skip unless using_async_adapter? - QueueAsJob.perform_later - assert ActiveJob::AsyncJob::QUEUES.key? QueueAsJob::MY_QUEUE.to_s - end -end diff --git a/activejob/test/cases/job_serialization_test.rb b/activejob/test/cases/job_serialization_test.rb index 229517774e..fa94209889 100644 --- a/activejob/test/cases/job_serialization_test.rb +++ b/activejob/test/cases/job_serialization_test.rb @@ -2,6 +2,7 @@ require 'helper' require 'jobs/gid_job' require 'jobs/hello_job' require 'models/person' +require 'json' class JobSerializationTest < ActiveSupport::TestCase setup do @@ -15,18 +16,32 @@ class JobSerializationTest < ActiveSupport::TestCase end test 'serialize includes current locale' do - assert_equal :en, HelloJob.new.serialize['locale'] + assert_equal 'en', HelloJob.new.serialize['locale'] + end + + test 'serialize and deserialize are symmetric' do + # Round trip a job in memory only + h1 = HelloJob.new + h1.deserialize(h1.serialize) + + # Now verify it's identical to a JSON round trip. + # We don't want any non-native JSON elements in the job hash, + # like symbols. + payload = JSON.dump(h1.serialize) + h2 = HelloJob.new + h2.deserialize(JSON.load(payload)) + assert_equal h1.serialize, h2.serialize end test 'deserialize sets locale' do job = HelloJob.new - job.deserialize 'locale' => :es - assert_equal :es, job.locale + job.deserialize 'locale' => 'es' + assert_equal 'es', job.locale end test 'deserialize sets default locale' do job = HelloJob.new job.deserialize({}) - assert_equal :en, job.locale + assert_equal 'en', job.locale end end diff --git a/activejob/test/helper.rb b/activejob/test/helper.rb index 7e86415f48..54b6076f09 100644 --- a/activejob/test/helper.rb +++ b/activejob/test/helper.rb @@ -1,5 +1,3 @@ -require File.expand_path('../../../load_paths', __FILE__) - require 'active_job' require 'support/job_buffer' diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb index d8425c9706..40f27500a5 100644 --- a/activejob/test/integration/queuing_test.rb +++ b/activejob/test/integration/queuing_test.rb @@ -57,13 +57,13 @@ class QueuingTest < ActiveSupport::TestCase end test 'should supply a provider_job_id when available for immediate jobs' do - skip unless adapter_is?(:delayed_job, :sidekiq, :qu, :que, :queue_classic) + skip unless adapter_is?(:async, :delayed_job, :sidekiq, :qu, :que, :queue_classic) test_job = TestJob.perform_later @id assert test_job.provider_job_id, 'Provider job id should be set by provider' end test 'should supply a provider_job_id when available for delayed jobs' do - skip unless adapter_is?(:delayed_job, :sidekiq, :que, :queue_classic) + skip unless adapter_is?(:async, :delayed_job, :sidekiq, :que, :queue_classic) delayed_test_job = TestJob.set(wait: 1.minute).perform_later @id assert delayed_test_job.provider_job_id, 'Provider job id should by set for delayed jobs by provider' end diff --git a/activejob/test/support/integration/adapters/async.rb b/activejob/test/support/integration/adapters/async.rb index 42beb12b1f..44ab98437a 100644 --- a/activejob/test/support/integration/adapters/async.rb +++ b/activejob/test/support/integration/adapters/async.rb @@ -1,9 +1,10 @@ module AsyncJobsManager def setup ActiveJob::Base.queue_adapter = :async + ActiveJob::Base.queue_adapter.immediate = false end def clear_jobs - ActiveJob::AsyncJob::QUEUES.clear + ActiveJob::Base.queue_adapter.shutdown end end diff --git a/activejob/test/support/integration/dummy_app_template.rb b/activejob/test/support/integration/dummy_app_template.rb index 262ca72327..a0ef38b0b2 100644 --- a/activejob/test/support/integration/dummy_app_template.rb +++ b/activejob/test/support/integration/dummy_app_template.rb @@ -2,7 +2,7 @@ if ENV['AJ_ADAPTER'] == 'delayed_job' generate "delayed_job:active_record", "--quiet" end -rake("db:migrate") +rails_command("db:migrate") initializer 'activejob.rb', <<-CODE require "#{File.expand_path("../jobs_manager.rb", __FILE__)}" |