From 8577687fcb9da20868a5ea50aea36427270d4485 Mon Sep 17 00:00:00 2001 From: Santiago Pastorino Date: Thu, 13 Sep 2012 15:09:15 -0700 Subject: Move queue classes to ActiveSupport --- railties/lib/rails.rb | 1 - railties/lib/rails/application.rb | 3 +- railties/lib/rails/application/configuration.rb | 5 +- railties/lib/rails/application/finisher.rb | 4 +- .../templates/config/environments/production.rb.tt | 2 +- .../app/templates/config/environments/test.rb.tt | 2 +- railties/lib/rails/queueing.rb | 115 --------------------- .../application/initializers/frameworks_test.rb | 6 +- railties/test/application/queue_test.rb | 16 +-- railties/test/queueing/container_test.rb | 30 ------ railties/test/queueing/test_queue_test.rb | 102 ------------------ railties/test/queueing/threaded_consumer_test.rb | 100 ------------------ 12 files changed, 20 insertions(+), 366 deletions(-) delete mode 100644 railties/lib/rails/queueing.rb delete mode 100644 railties/test/queueing/container_test.rb delete mode 100644 railties/test/queueing/test_queue_test.rb delete mode 100644 railties/test/queueing/threaded_consumer_test.rb (limited to 'railties') diff --git a/railties/lib/rails.rb b/railties/lib/rails.rb index 670477f91a..a15965a9da 100644 --- a/railties/lib/rails.rb +++ b/railties/lib/rails.rb @@ -22,7 +22,6 @@ end module Rails autoload :Info, 'rails/info' autoload :InfoController, 'rails/info_controller' - autoload :Queueing, 'rails/queueing' class << self def application diff --git a/railties/lib/rails/application.rb b/railties/lib/rails/application.rb index 80b8e4b9ba..aa5b986862 100644 --- a/railties/lib/rails/application.rb +++ b/railties/lib/rails/application.rb @@ -1,4 +1,5 @@ require 'fileutils' +require 'active_support/queueing' require 'rails/engine' module Rails @@ -188,7 +189,7 @@ module Rails end def queue #:nodoc: - @queue ||= Queueing::Container.new(build_queue) + @queue ||= ActiveSupport::QueueContainer.new(build_queue) end def build_queue #:nodoc: diff --git a/railties/lib/rails/application/configuration.rb b/railties/lib/rails/application/configuration.rb index 7a0bb21043..c4c1fcca40 100644 --- a/railties/lib/rails/application/configuration.rb +++ b/railties/lib/rails/application/configuration.rb @@ -1,5 +1,6 @@ require 'active_support/core_ext/kernel/reporting' require 'active_support/file_update_checker' +require 'active_support/queueing' require 'rails/engine/configuration' module Rails @@ -41,8 +42,8 @@ module Rails @exceptions_app = nil @autoflush_log = true @log_formatter = ActiveSupport::Logger::SimpleFormatter.new - @queue = Rails::Queueing::SynchronousQueue - @queue_consumer = Rails::Queueing::ThreadedConsumer + @queue = ActiveSupport::SynchronousQueue + @queue_consumer = ActiveSupport::ThreadedQueueConsumer @eager_load = nil @assets = ActiveSupport::OrderedOptions.new diff --git a/railties/lib/rails/application/finisher.rb b/railties/lib/rails/application/finisher.rb index 777f0d269e..f9a3c00946 100644 --- a/railties/lib/rails/application/finisher.rb +++ b/railties/lib/rails/application/finisher.rb @@ -97,8 +97,8 @@ module Rails end initializer :activate_queue_consumer do |app| - if config.queue == Rails::Queueing::Queue - app.queue_consumer = config.queue_consumer.start(app.queue) + if config.queue == ActiveSupport::Queue + app.queue_consumer = config.queue_consumer.start(app.queue, Rails.logger) at_exit { app.queue_consumer.shutdown } end end diff --git a/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt b/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt index fdf011a510..cb3e8b123e 100644 --- a/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt +++ b/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt @@ -82,5 +82,5 @@ # Default the production mode queue to an synchronous queue. You will probably # want to replace this with an out-of-process queueing solution. - # config.queue = Rails::Queueing::SynchronousQueue + # config.queue = ActiveSupport::SynchronousQueue end diff --git a/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt b/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt index 8ab27eb6e1..75897ba8cd 100644 --- a/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt +++ b/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt @@ -40,5 +40,5 @@ config.active_support.deprecation = :stderr # Use the testing queue. - config.queue = Rails::Queueing::TestQueue + config.queue = ActiveSupport::TestQueue end diff --git a/railties/lib/rails/queueing.rb b/railties/lib/rails/queueing.rb deleted file mode 100644 index 7cd755b0f7..0000000000 --- a/railties/lib/rails/queueing.rb +++ /dev/null @@ -1,115 +0,0 @@ -require "thread" -require 'delegate' - -module Rails - module Queueing - # A container for multiple queues. This class delegates to a default Queue - # so that Rails.queue.push and friends will Just Work. To use this class - # with multiple queues: - # - # # In your configuration: - # Rails.queue[:image_queue] = SomeQueue.new - # Rails.queue[:mail_queue] = SomeQueue.new - # - # # In your app code: - # Rails.queue[:mail_queue].push SomeJob.new - # - class Container < DelegateClass(::Queue) - def initialize(default_queue) - @queues = { :default => default_queue } - super(default_queue) - end - - def [](queue_name) - @queues[queue_name] - end - - def []=(queue_name, queue) - @queues[queue_name] = queue - end - end - - # A Queue that simply inherits from STDLIB's Queue. Everytime this - # queue is used, Rails automatically sets up a ThreadedConsumer - # to consume it. - class Queue < ::Queue - end - - class SynchronousQueue < ::Queue - def push(job) - job.run - end - alias << push - alias enq push - end - - # In test mode, the Rails queue is backed by an Array so that assertions - # can be made about its contents. The test queue provides a +jobs+ - # method to make assertions about the queue's contents and a +drain+ - # method to drain the queue and run the jobs. - # - # Jobs are run in a separate thread to catch mistakes where code - # assumes that the job is run in the same thread. - class TestQueue < ::Queue - # Get a list of the jobs off this queue. This method may not be - # available on production queues. - def jobs - @que.dup - end - - # Marshal and unmarshal job before pushing it onto the queue. This will - # raise an exception on any attempts in tests to push jobs that can't (or - # shouldn't) be marshalled. - def push(job) - super Marshal.load(Marshal.dump(job)) - end - - # Drain the queue, running all jobs in a different thread. This method - # may not be available on production queues. - def drain - # run the jobs in a separate thread so assumptions of synchronous - # jobs are caught in test mode. - Thread.new { pop.run until empty? }.join - end - end - - # The threaded consumer will run jobs in a background thread in - # development mode or in a VM where running jobs on a thread in - # production mode makes sense. - # - # When the process exits, the consumer pushes a nil onto the - # queue and joins the thread, which will ensure that all jobs - # are executed before the process finally dies. - class ThreadedConsumer - def self.start(queue) - new(queue).start - end - - def initialize(queue) - @queue = queue - end - - def start - @thread = Thread.new do - while job = @queue.pop - begin - job.run - rescue Exception => e - handle_exception e - end - end - end - self - end - - def shutdown - @queue.push nil - @thread.join - end - - def handle_exception(e) - Rails.logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" - end - end - end -end diff --git a/railties/test/application/initializers/frameworks_test.rb b/railties/test/application/initializers/frameworks_test.rb index 1eb5fce384..5268257d62 100644 --- a/railties/test/application/initializers/frameworks_test.rb +++ b/railties/test/application/initializers/frameworks_test.rb @@ -52,19 +52,19 @@ module ApplicationTests test "uses the default queue for ActionMailer" do require "#{app_path}/config/environment" - assert_kind_of Rails::Queueing::Container, ActionMailer::Base.queue + assert_kind_of ActiveSupport::QueueContainer, ActionMailer::Base.queue end test "allows me to configure queue for ActionMailer" do app_file "config/environments/development.rb", <<-RUBY AppTemplate::Application.configure do - Rails.queue[:mailer] = Rails::Queueing::TestQueue.new + Rails.queue[:mailer] = ActiveSupport::TestQueue.new config.action_mailer.queue = Rails.queue[:mailer] end RUBY require "#{app_path}/config/environment" - assert_kind_of Rails::Queueing::TestQueue, ActionMailer::Base.queue + assert_kind_of ActiveSupport::TestQueue, ActionMailer::Base.queue end test "does not include url helpers as action methods" do diff --git a/railties/test/application/queue_test.rb b/railties/test/application/queue_test.rb index f4c11c5f4f..664001ca69 100644 --- a/railties/test/application/queue_test.rb +++ b/railties/test/application/queue_test.rb @@ -19,14 +19,14 @@ module ApplicationTests test "the queue is a TestQueue in test mode" do app("test") - assert_kind_of Rails::Queueing::TestQueue, Rails.application.queue[:default] - assert_kind_of Rails::Queueing::TestQueue, Rails.queue[:default] + assert_kind_of ActiveSupport::TestQueue, Rails.application.queue[:default] + assert_kind_of ActiveSupport::TestQueue, Rails.queue[:default] end test "the queue is a SynchronousQueue in development mode" do app("development") - assert_kind_of Rails::Queueing::SynchronousQueue, Rails.application.queue[:default] - assert_kind_of Rails::Queueing::SynchronousQueue, Rails.queue[:default] + assert_kind_of ActiveSupport::SynchronousQueue, Rails.application.queue[:default] + assert_kind_of ActiveSupport::SynchronousQueue, Rails.queue[:default] end class ThreadTrackingJob @@ -47,7 +47,7 @@ module ApplicationTests end end - test "in development mode, an enqueued job will be processed in the same thread" do + test "in development mode, an enqueued job will be processed in a separate thread" do app("development") job = ThreadTrackingJob.new @@ -55,7 +55,7 @@ module ApplicationTests sleep 0.1 assert job.ran?, "Expected job to be run" - refute job.ran_in_different_thread?, "Expected job to run in the same thread" + assert job.ran_in_different_thread?, "Expected job to run in the same thread" end test "in test mode, explicitly draining the queue will process it in a separate thread" do @@ -160,12 +160,12 @@ module ApplicationTests test "a custom consumer implementation can be provided" do add_to_env_config "production", <<-RUBY require "my_queue_consumer" - config.queue = Rails::Queueing::Queue + config.queue = ActiveSupport::Queue config.queue_consumer = MyQueueConsumer RUBY app_file "lib/my_queue_consumer.rb", <<-RUBY - class MyQueueConsumer < Rails::Queueing::ThreadedConsumer + class MyQueueConsumer < ActiveSupport::ThreadedQueueConsumer attr_reader :started def start diff --git a/railties/test/queueing/container_test.rb b/railties/test/queueing/container_test.rb deleted file mode 100644 index 69e59a3871..0000000000 --- a/railties/test/queueing/container_test.rb +++ /dev/null @@ -1,30 +0,0 @@ -require 'abstract_unit' -require 'rails/queueing' - -module Rails - module Queueing - class ContainerTest < ActiveSupport::TestCase - def test_delegates_to_default - q = Queue.new - container = Container.new q - job = Object.new - - container.push job - assert_equal job, q.pop - end - - def test_access_default - q = Queue.new - container = Container.new q - assert_equal q, container[:default] - end - - def test_assign_queue - container = Container.new Object.new - q = Object.new - container[:foo] = q - assert_equal q, container[:foo] - end - end - end -end diff --git a/railties/test/queueing/test_queue_test.rb b/railties/test/queueing/test_queue_test.rb deleted file mode 100644 index 2f0f507adb..0000000000 --- a/railties/test/queueing/test_queue_test.rb +++ /dev/null @@ -1,102 +0,0 @@ -require 'abstract_unit' -require 'rails/queueing' - -class TestQueueTest < ActiveSupport::TestCase - def setup - @queue = Rails::Queueing::TestQueue.new - end - - class ExceptionRaisingJob - def run - raise - end - end - - def test_drain_raises - @queue.push ExceptionRaisingJob.new - assert_raises(RuntimeError) { @queue.drain } - end - - def test_jobs - @queue.push 1 - @queue.push 2 - assert_equal [1,2], @queue.jobs - end - - class EquivalentJob - def initialize - @initial_id = self.object_id - end - - def run - end - - def ==(other) - other.same_initial_id?(@initial_id) - end - - def same_initial_id?(other_id) - other_id == @initial_id - end - end - - def test_contents - assert @queue.empty? - job = EquivalentJob.new - @queue.push job - refute @queue.empty? - assert_equal job, @queue.pop - end - - class ProcessingJob - def self.clear_processed - @processed = [] - end - - def self.processed - @processed - end - - def initialize(object) - @object = object - end - - def run - self.class.processed << @object - end - end - - def test_order - ProcessingJob.clear_processed - job1 = ProcessingJob.new(1) - job2 = ProcessingJob.new(2) - - @queue.push job1 - @queue.push job2 - @queue.drain - - assert_equal [1,2], ProcessingJob.processed - end - - class ThreadTrackingJob - attr_reader :thread_id - - def run - @thread_id = Thread.current.object_id - end - - def ran? - @thread_id - end - end - - def test_drain - @queue.push ThreadTrackingJob.new - job = @queue.jobs.last - @queue.drain - - assert @queue.empty? - assert job.ran?, "The job runs synchronously when the queue is drained" - assert_not_equal job.thread_id, Thread.current.object_id - end -end diff --git a/railties/test/queueing/threaded_consumer_test.rb b/railties/test/queueing/threaded_consumer_test.rb deleted file mode 100644 index c34a966d6e..0000000000 --- a/railties/test/queueing/threaded_consumer_test.rb +++ /dev/null @@ -1,100 +0,0 @@ -require 'abstract_unit' -require 'rails/queueing' - -class TestThreadConsumer < ActiveSupport::TestCase - class Job - attr_reader :id - def initialize(id, &block) - @id = id - @block = block - end - - def run - @block.call if @block - end - end - - def setup - @queue = Rails::Queueing::Queue.new - @consumer = Rails::Queueing::ThreadedConsumer.start(@queue) - end - - def teardown - @queue.push nil - end - - test "the jobs are executed" do - ran = false - - job = Job.new(1) do - ran = true - end - - @queue.push job - sleep 0.1 - assert_equal true, ran - end - - test "the jobs are not executed synchronously" do - ran = false - - job = Job.new(1) do - sleep 0.1 - ran = true - end - - @queue.push job - assert_equal false, ran - end - - test "shutting down the queue synchronously drains the jobs" do - ran = false - - job = Job.new(1) do - sleep 0.1 - ran = true - end - - @queue.push job - assert_equal false, ran - - @consumer.shutdown - - assert_equal true, ran - end - - test "log job that raises an exception" do - require "active_support/log_subscriber/test_helper" - logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new - Rails.logger = logger - - job = Job.new(1) do - raise "RuntimeError: Error!" - end - - @queue.push job - sleep 0.1 - - assert_equal 1, logger.logged(:error).size - assert_match(/Job Error: RuntimeError: Error!/, logger.logged(:error).last) - end - - test "test overriding exception handling" do - @consumer.shutdown - @consumer = Class.new(Rails::Queueing::ThreadedConsumer) do - attr_reader :last_error - def handle_exception(e) - @last_error = e.message - end - end.start(@queue) - - job = Job.new(1) do - raise "RuntimeError: Error!" - end - - @queue.push job - sleep 0.1 - - assert_equal "RuntimeError: Error!", @consumer.last_error - end -end -- cgit v1.2.3