From 602000be90e9935f4f4ee5acc096725d7b7c33e5 Mon Sep 17 00:00:00 2001 From: Yehuda Katz Date: Thu, 26 Apr 2012 21:43:12 -0700 Subject: Missing git add :trollface: --- railties/lib/rails/queueing.rb | 65 +++++++++++++ railties/test/application/queue_test.rb | 114 +++++++++++++++++++++++ railties/test/queueing/test_queue_test.rb | 44 +++++++++ railties/test/queueing/threaded_consumer_test.rb | 65 +++++++++++++ 4 files changed, 288 insertions(+) create mode 100644 railties/lib/rails/queueing.rb create mode 100644 railties/test/application/queue_test.rb create mode 100644 railties/test/queueing/test_queue_test.rb create mode 100644 railties/test/queueing/threaded_consumer_test.rb (limited to 'railties') diff --git a/railties/lib/rails/queueing.rb b/railties/lib/rails/queueing.rb new file mode 100644 index 0000000000..b77940f821 --- /dev/null +++ b/railties/lib/rails/queueing.rb @@ -0,0 +1,65 @@ +module Rails + module Queueing + # In test mode, the Rails queue is backed by an Array so that assertions + # can be made about its contents. The test queue provides a +contents+ + # method to make assertions about the queue's contents and a +drain+ + # method to drain the queue and run the jobs. + # + # Jobs are run in a separate thread to catch mistakes where code + # assumes that the job is run in the same thread. + class TestQueue + attr_reader :contents + + def initialize + @contents = [] + end + + def drain + # run the jobs in a separate thread so assumptions of synchronous + # jobs are caught in test mode. + t = Thread.new do + while job = @contents.pop + job.run + end + end + t.join + end + + # implement the Queue API + def push(object) + @contents << object + end + end + + # The threaded consumer will run jobs in a background thread in + # development mode or in a VM where running jobs on a thread in + # production mode makes sense. + # + # When the process exits, the consumer pushes a nil onto the + # queue and joins the thread, which will ensure that all jobs + # are executed before the process finally dies. + class ThreadedConsumer + def self.start(queue) + new(queue).start + end + + def initialize(queue) + @queue = queue + end + + def start + @thread = Thread.new do + while job = @queue.pop + job.run + end + end + self + end + + def shutdown + @queue.push nil + @thread.join + end + end + end +end diff --git a/railties/test/application/queue_test.rb b/railties/test/application/queue_test.rb new file mode 100644 index 0000000000..667565b031 --- /dev/null +++ b/railties/test/application/queue_test.rb @@ -0,0 +1,114 @@ +require 'isolation/abstract_unit' +require 'rack/test' + +module ApplicationTests + class GeneratorsTest < ActiveSupport::TestCase + include ActiveSupport::Testing::Isolation + + def setup + build_app + boot_rails + end + + def teardown + teardown_app + end + + def app_const + @app_const ||= Class.new(Rails::Application) + end + + test "the queue is a TestQueue in test mode" do + app("test") + assert_kind_of Rails::Queueing::TestQueue, Rails.application.queue + assert_kind_of Rails::Queueing::TestQueue, Rails.queue + end + + test "the queue is a Queue in development mode" do + app("development") + assert_kind_of Queue, Rails.application.queue + assert_kind_of Queue, Rails.queue + end + + test "in development mode, an enqueued job will be processed in a separate thread" do + app("development") + current = Thread.current + + job = Struct.new(:origin, :target).new(Thread.current) + def job.run + self.target = Thread.current + end + + Rails.queue.push job + sleep 0.1 + + assert job.target, "The job was run" + assert_not_equal job.origin, job.target + end + + test "in test mode, explicitly draining the queue will process it in a separate thread" do + app("test") + current = Thread.current + + job = Struct.new(:origin, :target).new(Thread.current) + def job.run + self.target = Thread.current + end + + Rails.queue.push job + Rails.queue.drain + + assert job.target, "The job was run" + assert_not_equal job.origin, job.target + end + + test "in test mode, the queue can be observed" do + app("test") + + job = Class.new(Struct.new(:id)) do + def run + end + end + + jobs = (1..10).map do |id| + job.new(id) + end + + jobs.each do |job| + Rails.queue.push job + end + + assert_equal jobs, Rails.queue.contents + end + + test "a custom queue implementation can be provided" do + add_to_env_config "production", <<-RUBY + require "my_queue" + config.queue = MyQueue + RUBY + + app_file "lib/my_queue.rb", <<-RUBY + class MyQueue + def push(job) + job.run + end + end + RUBY + + app("production") + + assert_kind_of MyQueue, Rails.queue + + job = Class.new(Struct.new(:id, :ran)) do + def run + self.ran = true + end + end + + job1 = job.new(1) + Rails.queue.push job1 + + assert_equal true, job1.ran + end + end +end diff --git a/railties/test/queueing/test_queue_test.rb b/railties/test/queueing/test_queue_test.rb new file mode 100644 index 0000000000..de30e8cffd --- /dev/null +++ b/railties/test/queueing/test_queue_test.rb @@ -0,0 +1,44 @@ +require 'abstract_unit' +require 'rails/queueing' + +class TestQueueTest < ActiveSupport::TestCase + class Job + attr_reader :id + def initialize(id, &block) + @id = id + @block = block + end + + def run + @block.call if @block + end + end + + def setup + @queue = Rails::Queueing::TestQueue.new + end + + def test_contents + assert_equal [], @queue.contents + job = Job.new(1) + @queue.push job + assert_equal [job], @queue.contents + end + + def test_drain + t = nil + ran = false + + job = Job.new(1) do + ran = true + t = Thread.current + end + + @queue.push job + @queue.drain + + assert_equal [], @queue.contents + assert ran, "The job runs synchronously when the queue is drained" + assert_not_equal t, Thread.current + end +end diff --git a/railties/test/queueing/threaded_consumer_test.rb b/railties/test/queueing/threaded_consumer_test.rb new file mode 100644 index 0000000000..d00a67d511 --- /dev/null +++ b/railties/test/queueing/threaded_consumer_test.rb @@ -0,0 +1,65 @@ +require 'abstract_unit' +require 'rails/queueing' + +class TestThreadConsumer < ActiveSupport::TestCase + class Job + attr_reader :id + def initialize(id, &block) + @id = id + @block = block + end + + def run + @block.call if @block + end + end + + def setup + @queue = Queue.new + @consumer = Rails::Queueing::ThreadedConsumer.start(@queue) + end + + def teardown + @queue.push nil + end + + test "the jobs are executed" do + ran = false + + job = Job.new(1) do + ran = true + end + + @queue.push job + sleep 0.1 + assert_equal true, ran + end + + test "the jobs are not executed synchronously" do + ran = false + + job = Job.new(1) do + sleep 0.1 + ran = true + end + + @queue.push job + assert_equal false, ran + end + + test "shutting down the queue synchronously drains the jobs" do + ran = false + + job = Job.new(1) do + sleep 0.1 + ran = true + end + + @queue.push job + assert_equal false, ran + + @consumer.shutdown + + assert_equal true, ran + end +end -- cgit v1.2.3