diff options
Diffstat (limited to 'railties')
-rw-r--r-- | railties/lib/rails.rb | 20 | ||||
-rw-r--r-- | railties/lib/rails/application.rb | 6 | ||||
-rw-r--r-- | railties/lib/rails/application/configuration.rb | 3 | ||||
-rw-r--r-- | railties/lib/rails/application/finisher.rb | 7 | ||||
-rw-r--r-- | railties/lib/rails/generators/rails/app/templates/config/environments/development.rb.tt | 3 | ||||
-rw-r--r-- | railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt | 4 | ||||
-rw-r--r-- | railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt | 3 | ||||
-rw-r--r-- | railties/lib/rails/queueing.rb | 65 | ||||
-rw-r--r-- | railties/test/application/queue_test.rb | 114 | ||||
-rw-r--r-- | railties/test/isolation/abstract_unit.rb | 2 | ||||
-rw-r--r-- | railties/test/queueing/test_queue_test.rb | 44 | ||||
-rw-r--r-- | railties/test/queueing/threaded_consumer_test.rb | 65 |
12 files changed, 333 insertions, 3 deletions
diff --git a/railties/lib/rails.rb b/railties/lib/rails.rb index 945063e55c..59c3c56e59 100644 --- a/railties/lib/rails.rb +++ b/railties/lib/rails.rb @@ -22,6 +22,7 @@ end module Rails autoload :Info, 'rails/info' autoload :InfoController, 'rails/info_controller' + autoload :Queueing, 'rails/queueing' class << self def application @@ -37,6 +38,25 @@ module Rails application.config end + # Rails.queue is the application's queue. You can push a job onto + # the queue by: + # + # Rails.queue.push job + # + # A job is an object that responds to +run+. Queue consumers will + # pop jobs off of the queue and invoke the queue's +run+ method. + # + # Note that depending on your queue implementation, jobs may not + # be executed in the same process as they were created in, and + # are never executed in the same thread as they were created in. + # + # If necessary, a queue implementation may need to serialize your + # job for distribution to another process. The documentation of + # your queue will specify the requirements for that serialization. + def queue + application.queue + end + def initialize! application.initialize! end diff --git a/railties/lib/rails/application.rb b/railties/lib/rails/application.rb index b063e7681d..6ab448910e 100644 --- a/railties/lib/rails/application.rb +++ b/railties/lib/rails/application.rb @@ -66,7 +66,7 @@ module Rails end end - attr_accessor :assets, :sandbox + attr_accessor :assets, :sandbox, :queue alias_method :sandbox?, :sandbox attr_reader :reloaders @@ -199,6 +199,10 @@ module Rails @config ||= Application::Configuration.new(find_root_with_flag("config.ru", Dir.pwd)) end + def queue #:nodoc: + @queue ||= config.queue.new + end + def to_app self end diff --git a/railties/lib/rails/application/configuration.rb b/railties/lib/rails/application/configuration.rb index 37297f0496..a36dad0e98 100644 --- a/railties/lib/rails/application/configuration.rb +++ b/railties/lib/rails/application/configuration.rb @@ -11,7 +11,7 @@ module Rails :force_ssl, :helpers_paths, :logger, :log_formatter, :log_tags, :preload_frameworks, :railties_order, :relative_url_root, :secret_token, :serve_static_assets, :ssl_options, :static_cache_control, :session_options, - :time_zone, :reload_classes_only_on_change, :use_schema_cache_dump + :time_zone, :reload_classes_only_on_change, :use_schema_cache_dump, :queue attr_writer :log_level attr_reader :encoding @@ -43,6 +43,7 @@ module Rails @autoflush_log = true @log_formatter = ActiveSupport::Logger::SimpleFormatter.new @use_schema_cache_dump = true + @queue = Queue @assets = ActiveSupport::OrderedOptions.new @assets.enabled = false diff --git a/railties/lib/rails/application/finisher.rb b/railties/lib/rails/application/finisher.rb index 002c6026e4..6475b381f4 100644 --- a/railties/lib/rails/application/finisher.rb +++ b/railties/lib/rails/application/finisher.rb @@ -93,6 +93,13 @@ module Rails ActiveSupport::Dependencies.unhook! end end + + initializer :activate_queue_consumer do |app| + if config.queue == Queue + consumer = Rails::Queueing::ThreadedConsumer.start(app.queue) + at_exit { consumer.shutdown } + end + end end end end diff --git a/railties/lib/rails/generators/rails/app/templates/config/environments/development.rb.tt b/railties/lib/rails/generators/rails/app/templates/config/environments/development.rb.tt index eb4dfa7c89..c486ae590e 100644 --- a/railties/lib/rails/generators/rails/app/templates/config/environments/development.rb.tt +++ b/railties/lib/rails/generators/rails/app/templates/config/environments/development.rb.tt @@ -35,4 +35,7 @@ # Expands the lines which load the assets. config.assets.debug = true <%- end -%> + + # In development, use an in-memory queue for queueing + config.queue = Queue end diff --git a/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt b/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt index 1c980e5ce6..854e6e95cd 100644 --- a/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt +++ b/railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt @@ -76,4 +76,8 @@ # Use default logging formatter so that PID and timestamp are not suppressed config.log_formatter = ::Logger::Formatter.new + + # Default the production mode queue to an in-memory queue. You will probably + # want to replace this with an out-of-process queueing solution + config.queue = Queue end diff --git a/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt b/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt index b725dd19f6..b27b88a3c6 100644 --- a/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt +++ b/railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt @@ -33,4 +33,7 @@ # Print deprecation notices to the stderr. config.active_support.deprecation = :stderr + + # Use the testing queue + config.queue = Rails::Queueing::TestQueue end diff --git a/railties/lib/rails/queueing.rb b/railties/lib/rails/queueing.rb new file mode 100644 index 0000000000..b77940f821 --- /dev/null +++ b/railties/lib/rails/queueing.rb @@ -0,0 +1,65 @@ +module Rails + module Queueing + # In test mode, the Rails queue is backed by an Array so that assertions + # can be made about its contents. The test queue provides a +contents+ + # method to make assertions about the queue's contents and a +drain+ + # method to drain the queue and run the jobs. + # + # Jobs are run in a separate thread to catch mistakes where code + # assumes that the job is run in the same thread. + class TestQueue + attr_reader :contents + + def initialize + @contents = [] + end + + def drain + # run the jobs in a separate thread so assumptions of synchronous + # jobs are caught in test mode. + t = Thread.new do + while job = @contents.pop + job.run + end + end + t.join + end + + # implement the Queue API + def push(object) + @contents << object + end + end + + # The threaded consumer will run jobs in a background thread in + # development mode or in a VM where running jobs on a thread in + # production mode makes sense. + # + # When the process exits, the consumer pushes a nil onto the + # queue and joins the thread, which will ensure that all jobs + # are executed before the process finally dies. + class ThreadedConsumer + def self.start(queue) + new(queue).start + end + + def initialize(queue) + @queue = queue + end + + def start + @thread = Thread.new do + while job = @queue.pop + job.run + end + end + self + end + + def shutdown + @queue.push nil + @thread.join + end + end + end +end diff --git a/railties/test/application/queue_test.rb b/railties/test/application/queue_test.rb new file mode 100644 index 0000000000..667565b031 --- /dev/null +++ b/railties/test/application/queue_test.rb @@ -0,0 +1,114 @@ +require 'isolation/abstract_unit' +require 'rack/test' + +module ApplicationTests + class GeneratorsTest < ActiveSupport::TestCase + include ActiveSupport::Testing::Isolation + + def setup + build_app + boot_rails + end + + def teardown + teardown_app + end + + def app_const + @app_const ||= Class.new(Rails::Application) + end + + test "the queue is a TestQueue in test mode" do + app("test") + assert_kind_of Rails::Queueing::TestQueue, Rails.application.queue + assert_kind_of Rails::Queueing::TestQueue, Rails.queue + end + + test "the queue is a Queue in development mode" do + app("development") + assert_kind_of Queue, Rails.application.queue + assert_kind_of Queue, Rails.queue + end + + test "in development mode, an enqueued job will be processed in a separate thread" do + app("development") + current = Thread.current + + job = Struct.new(:origin, :target).new(Thread.current) + def job.run + self.target = Thread.current + end + + Rails.queue.push job + sleep 0.1 + + assert job.target, "The job was run" + assert_not_equal job.origin, job.target + end + + test "in test mode, explicitly draining the queue will process it in a separate thread" do + app("test") + current = Thread.current + + job = Struct.new(:origin, :target).new(Thread.current) + def job.run + self.target = Thread.current + end + + Rails.queue.push job + Rails.queue.drain + + assert job.target, "The job was run" + assert_not_equal job.origin, job.target + end + + test "in test mode, the queue can be observed" do + app("test") + + job = Class.new(Struct.new(:id)) do + def run + end + end + + jobs = (1..10).map do |id| + job.new(id) + end + + jobs.each do |job| + Rails.queue.push job + end + + assert_equal jobs, Rails.queue.contents + end + + test "a custom queue implementation can be provided" do + add_to_env_config "production", <<-RUBY + require "my_queue" + config.queue = MyQueue + RUBY + + app_file "lib/my_queue.rb", <<-RUBY + class MyQueue + def push(job) + job.run + end + end + RUBY + + app("production") + + assert_kind_of MyQueue, Rails.queue + + job = Class.new(Struct.new(:id, :ran)) do + def run + self.ran = true + end + end + + job1 = job.new(1) + Rails.queue.push job1 + + assert_equal true, job1.ran + end + end +end diff --git a/railties/test/isolation/abstract_unit.rb b/railties/test/isolation/abstract_unit.rb index b28cc6e04d..7957186ba2 100644 --- a/railties/test/isolation/abstract_unit.rb +++ b/railties/test/isolation/abstract_unit.rb @@ -8,7 +8,7 @@ # Rails booted up. require 'fileutils' -require 'rubygems' +require 'bundler/setup' require 'minitest/autorun' require 'active_support/test_case' diff --git a/railties/test/queueing/test_queue_test.rb b/railties/test/queueing/test_queue_test.rb new file mode 100644 index 0000000000..de30e8cffd --- /dev/null +++ b/railties/test/queueing/test_queue_test.rb @@ -0,0 +1,44 @@ +require 'abstract_unit' +require 'rails/queueing' + +class TestQueueTest < ActiveSupport::TestCase + class Job + attr_reader :id + def initialize(id, &block) + @id = id + @block = block + end + + def run + @block.call if @block + end + end + + def setup + @queue = Rails::Queueing::TestQueue.new + end + + def test_contents + assert_equal [], @queue.contents + job = Job.new(1) + @queue.push job + assert_equal [job], @queue.contents + end + + def test_drain + t = nil + ran = false + + job = Job.new(1) do + ran = true + t = Thread.current + end + + @queue.push job + @queue.drain + + assert_equal [], @queue.contents + assert ran, "The job runs synchronously when the queue is drained" + assert_not_equal t, Thread.current + end +end diff --git a/railties/test/queueing/threaded_consumer_test.rb b/railties/test/queueing/threaded_consumer_test.rb new file mode 100644 index 0000000000..d00a67d511 --- /dev/null +++ b/railties/test/queueing/threaded_consumer_test.rb @@ -0,0 +1,65 @@ +require 'abstract_unit' +require 'rails/queueing' + +class TestThreadConsumer < ActiveSupport::TestCase + class Job + attr_reader :id + def initialize(id, &block) + @id = id + @block = block + end + + def run + @block.call if @block + end + end + + def setup + @queue = Queue.new + @consumer = Rails::Queueing::ThreadedConsumer.start(@queue) + end + + def teardown + @queue.push nil + end + + test "the jobs are executed" do + ran = false + + job = Job.new(1) do + ran = true + end + + @queue.push job + sleep 0.1 + assert_equal true, ran + end + + test "the jobs are not executed synchronously" do + ran = false + + job = Job.new(1) do + sleep 0.1 + ran = true + end + + @queue.push job + assert_equal false, ran + end + + test "shutting down the queue synchronously drains the jobs" do + ran = false + + job = Job.new(1) do + sleep 0.1 + ran = true + end + + @queue.push job + assert_equal false, ran + + @consumer.shutdown + + assert_equal true, ran + end +end |