diff options
17 files changed, 180 insertions, 182 deletions
diff --git a/actionmailer/test/abstract_unit.rb b/actionmailer/test/abstract_unit.rb index 1167387def..0b418c4ea1 100644 --- a/actionmailer/test/abstract_unit.rb +++ b/actionmailer/test/abstract_unit.rb @@ -11,7 +11,7 @@ end require 'minitest/autorun' require 'action_mailer' require 'action_mailer/test_case' -require 'rails/queueing' +require 'active_support/queueing' silence_warnings do # These external dependencies have warnings :/ @@ -27,7 +27,7 @@ ActionView::Template.register_template_handler :bak, lambda { |template| "Lame b FIXTURE_LOAD_PATH = File.expand_path('fixtures', File.dirname(__FILE__)) ActionMailer::Base.view_paths = FIXTURE_LOAD_PATH -ActionMailer::Base.queue = Rails::Queueing::SynchronousQueue.new +ActionMailer::Base.queue = ActiveSupport::SynchronousQueue.new class MockSMTP def self.deliveries diff --git a/actionmailer/test/base_test.rb b/actionmailer/test/base_test.rb index 862b954f9e..6a06cec041 100644 --- a/actionmailer/test/base_test.rb +++ b/actionmailer/test/base_test.rb @@ -3,13 +3,13 @@ require 'abstract_unit' require 'set' require 'action_dispatch' +require 'active_support/queueing' require 'active_support/time' require 'mailers/base_mailer' require 'mailers/proc_mailer' require 'mailers/asset_mailer' require 'mailers/async_mailer' -require 'rails/queueing' class BaseTest < ActiveSupport::TestCase def teardown @@ -433,7 +433,7 @@ class BaseTest < ActiveSupport::TestCase end test "delivering message asynchronously" do - testing_queue = Rails::Queueing::TestQueue.new + testing_queue = ActiveSupport::TestQueue.new AsyncMailer.delivery_method = :test AsyncMailer.deliveries.clear stub_queue(AsyncMailer, testing_queue).welcome.deliver diff --git a/activesupport/lib/active_support/queueing.rb b/activesupport/lib/active_support/queueing.rb new file mode 100644 index 0000000000..f397e1c0c5 --- /dev/null +++ b/activesupport/lib/active_support/queueing.rb @@ -0,0 +1,116 @@ +require 'delegate' +require 'thread' + +module ActiveSupport + # A Queue that simply inherits from STDLIB's Queue. Everytime this + # queue is used, Rails automatically sets up a ThreadedConsumer + # to consume it. + class Queue < ::Queue + end + + class SynchronousQueue < ::Queue + def push(job) + result = nil + Thread.new { result = job.run }.join + result + end + alias << push + alias enq push + end + + # In test mode, the Rails queue is backed by an Array so that assertions + # can be made about its contents. The test queue provides a +jobs+ + # method to make assertions about the queue's contents and a +drain+ + # method to drain the queue and run the jobs. + # + # Jobs are run in a separate thread to catch mistakes where code + # assumes that the job is run in the same thread. + class TestQueue < ::Queue + # Get a list of the jobs off this queue. This method may not be + # available on production queues. + def jobs + @que.dup + end + + # Marshal and unmarshal job before pushing it onto the queue. This will + # raise an exception on any attempts in tests to push jobs that can't (or + # shouldn't) be marshalled. + def push(job) + super Marshal.load(Marshal.dump(job)) + end + + # Drain the queue, running all jobs in a different thread. This method + # may not be available on production queues. + def drain + # run the jobs in a separate thread so assumptions of synchronous + # jobs are caught in test mode. + Thread.new { pop.run until empty? }.join + end + end + + # A container for multiple queues. This class delegates to a default Queue + # so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class + # with multiple queues: + # + # # In your configuration: + # Rails.queue[:image_queue] = SomeQueue.new + # Rails.queue[:mail_queue] = SomeQueue.new + # + # # In your app code: + # Rails.queue[:mail_queue].push SomeJob.new + # + class QueueContainer < DelegateClass(::Queue) + def initialize(default_queue) + @queues = { :default => default_queue } + super(default_queue) + end + + def [](queue_name) + @queues[queue_name] + end + + def []=(queue_name, queue) + @queues[queue_name] = queue + end + end + + # The threaded consumer will run jobs in a background thread in + # development mode or in a VM where running jobs on a thread in + # production mode makes sense. + # + # When the process exits, the consumer pushes a nil onto the + # queue and joins the thread, which will ensure that all jobs + # are executed before the process finally dies. + class ThreadedQueueConsumer + def self.start(queue, logger=nil) + new(queue, logger).start + end + + def initialize(queue, logger=nil) + @queue = queue + @logger = logger + end + + def start + @thread = Thread.new do + while job = @queue.pop + begin + job.run + rescue Exception => e + handle_exception e + end + end + end + self + end + + def shutdown + @queue.push nil + @thread.join + end + + def handle_exception(e) + @logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" if @logger + end + end +end diff --git a/activesupport/test/queueing/container_test.rb b/activesupport/test/queueing/container_test.rb new file mode 100644 index 0000000000..7afc11e7a9 --- /dev/null +++ b/activesupport/test/queueing/container_test.rb @@ -0,0 +1,28 @@ +require 'abstract_unit' +require 'active_support/queueing' + +module ActiveSupport + class ContainerTest < ActiveSupport::TestCase + def test_delegates_to_default + q = Queue.new + container = QueueContainer.new q + job = Object.new + + container.push job + assert_equal job, q.pop + end + + def test_access_default + q = Queue.new + container = QueueContainer.new q + assert_equal q, container[:default] + end + + def test_assign_queue + container = QueueContainer.new Object.new + q = Object.new + container[:foo] = q + assert_equal q, container[:foo] + end + end +end diff --git a/railties/test/queueing/test_queue_test.rb b/activesupport/test/queueing/test_queue_test.rb index 2f0f507adb..4c08314366 100644 --- a/railties/test/queueing/test_queue_test.rb +++ b/activesupport/test/queueing/test_queue_test.rb @@ -1,9 +1,9 @@ require 'abstract_unit' -require 'rails/queueing' +require 'active_support/queueing' class TestQueueTest < ActiveSupport::TestCase def setup - @queue = Rails::Queueing::TestQueue.new + @queue = ActiveSupport::TestQueue.new end class ExceptionRaisingJob diff --git a/railties/test/queueing/threaded_consumer_test.rb b/activesupport/test/queueing/threaded_consumer_test.rb index c34a966d6e..20a1cc4e8e 100644 --- a/railties/test/queueing/threaded_consumer_test.rb +++ b/activesupport/test/queueing/threaded_consumer_test.rb @@ -1,5 +1,6 @@ require 'abstract_unit' -require 'rails/queueing' +require 'active_support/queueing' +require "active_support/log_subscriber/test_helper" class TestThreadConsumer < ActiveSupport::TestCase class Job @@ -15,8 +16,9 @@ class TestThreadConsumer < ActiveSupport::TestCase end def setup - @queue = Rails::Queueing::Queue.new - @consumer = Rails::Queueing::ThreadedConsumer.start(@queue) + @queue = ActiveSupport::Queue.new + @logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new + @consumer = ActiveSupport::ThreadedQueueConsumer.start(@queue, @logger) end def teardown @@ -64,10 +66,6 @@ class TestThreadConsumer < ActiveSupport::TestCase end test "log job that raises an exception" do - require "active_support/log_subscriber/test_helper" - logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new - Rails.logger = logger - job = Job.new(1) do raise "RuntimeError: Error!" end @@ -75,13 +73,13 @@ class TestThreadConsumer < ActiveSupport::TestCase @queue.push job sleep 0.1 - assert_equal 1, logger.logged(:error).size - assert_match(/Job Error: RuntimeError: Error!/, logger.logged(:error).last) + assert_equal 1, @logger.logged(:error).size + assert_match(/Job Error: RuntimeError: Error!/, @logger.logged(:error).last) end test "test overriding exception handling" do @consumer.shutdown - @consumer = Class.new(Rails::Queueing::ThreadedConsumer) do + @consumer = Class.new(ActiveSupport::ThreadedQueueConsumer) do attr_reader :last_error def handle_exception(e) @last_error = e.message diff --git a/guides/source/configuring.textile b/guides/source/configuring.textile index 73d6102eac..9c35738bf8 100644 --- a/guides/source/configuring.textile +++ b/guides/source/configuring.textile @@ -111,9 +111,9 @@ end * +config.middleware+ allows you to configure the application's middleware. This is covered in depth in the "Configuring Middleware":#configuring-middleware section below. -* +config.queue+ configures a different queue implementation for the application. Defaults to +Rails::Queueing::Queue+. Note that, if the default queue is changed, the default +queue_consumer+ is not going to be initialized, it is up to the new queue implementation to handle starting and shutting down its own consumer(s). +* +config.queue+ configures a different queue implementation for the application. Defaults to +ActiveSupport::SynchronousQueue+. Note that, if the default queue is changed, the default +queue_consumer+ is not going to be initialized, it is up to the new queue implementation to handle starting and shutting down its own consumer(s). -* +config.queue_consumer+ configures a different consumer implementation for the default queue. Defaults to +Rails::Queueing::ThreadedConsumer+. +* +config.queue_consumer+ configures a different consumer implementation for the default queue. Defaults to +ActiveSupport::ThreadedQueueConsumer+. * +config.reload_classes_only_on_change+ enables or disables reloading of classes only when tracked files change. By default tracks everything on autoload paths and is set to true. If +config.cache_classes+ is true, this option is ignored. diff --git a/railties/lib/rails.rb b/railties/lib/rails.rb index 670477f91a..a15965a9da 100644 --- a/railties/lib/rails.rb +++ b/railties/lib/rails.rb @@ -22,7 +22,6 @@ end module Rails autoload :Info, 'rails/info' autoload :InfoController, 'rails/info_controller' - autoload :Queueing, 'rails/queueing' class << self def application diff --git a/railties/lib/rails/application.rb b/railties/lib/rails/application.rb index 80b8e4b9ba..aa5b986862 100644 --- a/railties/lib/rails/application.rb +++ b/railties/lib/rails/application.rb @@ -1,4 +1,5 @@ require 'fileutils' +require 'active_support/queueing' require 'rails/engine' module Rails @@ -188,7 +189,7 @@ module Rails end def queue #:nodoc: - @queue ||= Queueing::Container.new(build_queue) + @queue ||= ActiveSupport::QueueContainer.new(build_queue) end def build_queue #:nodoc: diff --git a/railties/lib/rails/application/configuration.rb b/railties/lib/rails/application/configuration.rb index 7a0bb21043..c4c1fcca40 100644 --- a/railties/lib/rails/application/configuration.rb +++ b/railties/lib/rails/application/configuration.rb @@ -1,5 +1,6 @@ require 'active_support/core_ext/kernel/reporting' require 'active_support/file_update_checker' +require 'active_support/queueing' require 'rails/engine/configuration' module Rails @@ -41,8 +42,8 @@ module Rails @exceptions_app = nil @autoflush_log = true @log_formatter = ActiveSupport::Logger::SimpleFormatter.new - @queue = Rails::Queueing::SynchronousQueue - @queue_consumer = Rails::Queueing::ThreadedConsumer + @queue = ActiveSupport::SynchronousQueue + @queue_consumer = ActiveSupport::ThreadedQueueConsumer @eager_load = nil @assets = ActiveSupport::OrderedOptions.new diff --git a/railties/lib/rails/application/finisher.rb b/railties/lib/rails/application/finisher.rb index 777f0d269e..f9a3c00946 100644 --- a/railties/lib/rails/application/finisher.rb +++ b/railties/lib/rails/application/finisher.rb @@ -97,8 +97,8 @@ module Rails end initializer :activate_queue_consumer do |app| - if config.queue == Rails::Queueing::Queue - app.queue_consumer = config.queue_consumer.start(app.queue) + if config.queue == ActiveSupport::Queue + app.queue_consumer = config.queue_consumer.start(app.queue, Rails.logger) at_exit { app.queue_consumer.shutdown } end end diff --git a/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt b/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt index fdf011a510..cb3e8b123e 100644 --- a/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt +++ b/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt @@ -82,5 +82,5 @@ # Default the production mode queue to an synchronous queue. You will probably # want to replace this with an out-of-process queueing solution. - # config.queue = Rails::Queueing::SynchronousQueue + # config.queue = ActiveSupport::SynchronousQueue end diff --git a/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt b/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt index 8ab27eb6e1..75897ba8cd 100644 --- a/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt +++ b/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt @@ -40,5 +40,5 @@ config.active_support.deprecation = :stderr # Use the testing queue. - config.queue = Rails::Queueing::TestQueue + config.queue = ActiveSupport::TestQueue end diff --git a/railties/lib/rails/queueing.rb b/railties/lib/rails/queueing.rb deleted file mode 100644 index 7cd755b0f7..0000000000 --- a/railties/lib/rails/queueing.rb +++ /dev/null @@ -1,115 +0,0 @@ -require "thread" -require 'delegate' - -module Rails - module Queueing - # A container for multiple queues. This class delegates to a default Queue - # so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class - # with multiple queues: - # - # # In your configuration: - # Rails.queue[:image_queue] = SomeQueue.new - # Rails.queue[:mail_queue] = SomeQueue.new - # - # # In your app code: - # Rails.queue[:mail_queue].push SomeJob.new - # - class Container < DelegateClass(::Queue) - def initialize(default_queue) - @queues = { :default => default_queue } - super(default_queue) - end - - def [](queue_name) - @queues[queue_name] - end - - def []=(queue_name, queue) - @queues[queue_name] = queue - end - end - - # A Queue that simply inherits from STDLIB's Queue. Everytime this - # queue is used, Rails automatically sets up a ThreadedConsumer - # to consume it. - class Queue < ::Queue - end - - class SynchronousQueue < ::Queue - def push(job) - job.run - end - alias << push - alias enq push - end - - # In test mode, the Rails queue is backed by an Array so that assertions - # can be made about its contents. The test queue provides a +jobs+ - # method to make assertions about the queue's contents and a +drain+ - # method to drain the queue and run the jobs. - # - # Jobs are run in a separate thread to catch mistakes where code - # assumes that the job is run in the same thread. - class TestQueue < ::Queue - # Get a list of the jobs off this queue. This method may not be - # available on production queues. - def jobs - @que.dup - end - - # Marshal and unmarshal job before pushing it onto the queue. This will - # raise an exception on any attempts in tests to push jobs that can't (or - # shouldn't) be marshalled. - def push(job) - super Marshal.load(Marshal.dump(job)) - end - - # Drain the queue, running all jobs in a different thread. This method - # may not be available on production queues. - def drain - # run the jobs in a separate thread so assumptions of synchronous - # jobs are caught in test mode. - Thread.new { pop.run until empty? }.join - end - end - - # The threaded consumer will run jobs in a background thread in - # development mode or in a VM where running jobs on a thread in - # production mode makes sense. - # - # When the process exits, the consumer pushes a nil onto the - # queue and joins the thread, which will ensure that all jobs - # are executed before the process finally dies. - class ThreadedConsumer - def self.start(queue) - new(queue).start - end - - def initialize(queue) - @queue = queue - end - - def start - @thread = Thread.new do - while job = @queue.pop - begin - job.run - rescue Exception => e - handle_exception e - end - end - end - self - end - - def shutdown - @queue.push nil - @thread.join - end - - def handle_exception(e) - Rails.logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" - end - end - end -end diff --git a/railties/test/application/initializers/frameworks_test.rb b/railties/test/application/initializers/frameworks_test.rb index 1eb5fce384..5268257d62 100644 --- a/railties/test/application/initializers/frameworks_test.rb +++ b/railties/test/application/initializers/frameworks_test.rb @@ -52,19 +52,19 @@ module ApplicationTests test "uses the default queue for ActionMailer" do require "#{app_path}/config/environment" - assert_kind_of Rails::Queueing::Container, ActionMailer::Base.queue + assert_kind_of ActiveSupport::QueueContainer, ActionMailer::Base.queue end test "allows me to configure queue for ActionMailer" do app_file "config/environments/development.rb", <<-RUBY AppTemplate::Application.configure do - Rails.queue[:mailer] = Rails::Queueing::TestQueue.new + Rails.queue[:mailer] = ActiveSupport::TestQueue.new config.action_mailer.queue = Rails.queue[:mailer] end RUBY require "#{app_path}/config/environment" - assert_kind_of Rails::Queueing::TestQueue, ActionMailer::Base.queue + assert_kind_of ActiveSupport::TestQueue, ActionMailer::Base.queue end test "does not include url helpers as action methods" do diff --git a/railties/test/application/queue_test.rb b/railties/test/application/queue_test.rb index f4c11c5f4f..664001ca69 100644 --- a/railties/test/application/queue_test.rb +++ b/railties/test/application/queue_test.rb @@ -19,14 +19,14 @@ module ApplicationTests test "the queue is a TestQueue in test mode" do app("test") - assert_kind_of Rails::Queueing::TestQueue, Rails.application.queue[:default] - assert_kind_of Rails::Queueing::TestQueue, Rails.queue[:default] + assert_kind_of ActiveSupport::TestQueue, Rails.application.queue[:default] + assert_kind_of ActiveSupport::TestQueue, Rails.queue[:default] end test "the queue is a SynchronousQueue in development mode" do app("development") - assert_kind_of Rails::Queueing::SynchronousQueue, Rails.application.queue[:default] - assert_kind_of Rails::Queueing::SynchronousQueue, Rails.queue[:default] + assert_kind_of ActiveSupport::SynchronousQueue, Rails.application.queue[:default] + assert_kind_of ActiveSupport::SynchronousQueue, Rails.queue[:default] end class ThreadTrackingJob @@ -47,7 +47,7 @@ module ApplicationTests end end - test "in development mode, an enqueued job will be processed in the same thread" do + test "in development mode, an enqueued job will be processed in a separate thread" do app("development") job = ThreadTrackingJob.new @@ -55,7 +55,7 @@ module ApplicationTests sleep 0.1 assert job.ran?, "Expected job to be run" - refute job.ran_in_different_thread?, "Expected job to run in the same thread" + assert job.ran_in_different_thread?, "Expected job to run in the same thread" end test "in test mode, explicitly draining the queue will process it in a separate thread" do @@ -160,12 +160,12 @@ module ApplicationTests test "a custom consumer implementation can be provided" do add_to_env_config "production", <<-RUBY require "my_queue_consumer" - config.queue = Rails::Queueing::Queue + config.queue = ActiveSupport::Queue config.queue_consumer = MyQueueConsumer RUBY app_file "lib/my_queue_consumer.rb", <<-RUBY - class MyQueueConsumer < Rails::Queueing::ThreadedConsumer + class MyQueueConsumer < ActiveSupport::ThreadedQueueConsumer attr_reader :started def start diff --git a/railties/test/queueing/container_test.rb b/railties/test/queueing/container_test.rb deleted file mode 100644 index 69e59a3871..0000000000 --- a/railties/test/queueing/container_test.rb +++ /dev/null @@ -1,30 +0,0 @@ -require 'abstract_unit' -require 'rails/queueing' - -module Rails - module Queueing - class ContainerTest < ActiveSupport::TestCase - def test_delegates_to_default - q = Queue.new - container = Container.new q - job = Object.new - - container.push job - assert_equal job, q.pop - end - - def test_access_default - q = Queue.new - container = Container.new q - assert_equal q, container[:default] - end - - def test_assign_queue - container = Container.new Object.new - q = Object.new - container[:foo] = q - assert_equal q, container[:foo] - end - end - end -end |