aboutsummaryrefslogtreecommitdiffstats
path: root/railties
diff options
context:
space:
mode:
authorYehuda Katz <wycats@gmail.com>2012-04-26 21:43:12 -0700
committerYehuda Katz <wycats@gmail.com>2012-04-26 21:43:12 -0700
commit602000be90e9935f4f4ee5acc096725d7b7c33e5 (patch)
tree5c0c7938927be67b56f5c50a08e482a84992ea95 /railties
parentadff4a706a5d7ad18ef05303461e1a0d848bd662 (diff)
downloadrails-602000be90e9935f4f4ee5acc096725d7b7c33e5.tar.gz
rails-602000be90e9935f4f4ee5acc096725d7b7c33e5.tar.bz2
rails-602000be90e9935f4f4ee5acc096725d7b7c33e5.zip
Missing git add :trollface:
Diffstat (limited to 'railties')
-rw-r--r--railties/lib/rails/queueing.rb65
-rw-r--r--railties/test/application/queue_test.rb114
-rw-r--r--railties/test/queueing/test_queue_test.rb44
-rw-r--r--railties/test/queueing/threaded_consumer_test.rb65
4 files changed, 288 insertions, 0 deletions
diff --git a/railties/lib/rails/queueing.rb b/railties/lib/rails/queueing.rb
new file mode 100644
index 0000000000..b77940f821
--- /dev/null
+++ b/railties/lib/rails/queueing.rb
@@ -0,0 +1,65 @@
+module Rails
+ module Queueing
+ # In test mode, the Rails queue is backed by an Array so that assertions
+ # can be made about its contents. The test queue provides a +contents+
+ # method to make assertions about the queue's contents and a +drain+
+ # method to drain the queue and run the jobs.
+ #
+ # Jobs are run in a separate thread to catch mistakes where code
+ # assumes that the job is run in the same thread.
+ class TestQueue
+ attr_reader :contents
+
+ def initialize
+ @contents = []
+ end
+
+ def drain
+ # run the jobs in a separate thread so assumptions of synchronous
+ # jobs are caught in test mode.
+ t = Thread.new do
+ while job = @contents.pop
+ job.run
+ end
+ end
+ t.join
+ end
+
+ # implement the Queue API
+ def push(object)
+ @contents << object
+ end
+ end
+
+ # The threaded consumer will run jobs in a background thread in
+ # development mode or in a VM where running jobs on a thread in
+ # production mode makes sense.
+ #
+ # When the process exits, the consumer pushes a nil onto the
+ # queue and joins the thread, which will ensure that all jobs
+ # are executed before the process finally dies.
+ class ThreadedConsumer
+ def self.start(queue)
+ new(queue).start
+ end
+
+ def initialize(queue)
+ @queue = queue
+ end
+
+ def start
+ @thread = Thread.new do
+ while job = @queue.pop
+ job.run
+ end
+ end
+ self
+ end
+
+ def shutdown
+ @queue.push nil
+ @thread.join
+ end
+ end
+ end
+end
diff --git a/railties/test/application/queue_test.rb b/railties/test/application/queue_test.rb
new file mode 100644
index 0000000000..667565b031
--- /dev/null
+++ b/railties/test/application/queue_test.rb
@@ -0,0 +1,114 @@
+require 'isolation/abstract_unit'
+require 'rack/test'
+
+module ApplicationTests
+ class GeneratorsTest < ActiveSupport::TestCase
+ include ActiveSupport::Testing::Isolation
+
+ def setup
+ build_app
+ boot_rails
+ end
+
+ def teardown
+ teardown_app
+ end
+
+ def app_const
+ @app_const ||= Class.new(Rails::Application)
+ end
+
+ test "the queue is a TestQueue in test mode" do
+ app("test")
+ assert_kind_of Rails::Queueing::TestQueue, Rails.application.queue
+ assert_kind_of Rails::Queueing::TestQueue, Rails.queue
+ end
+
+ test "the queue is a Queue in development mode" do
+ app("development")
+ assert_kind_of Queue, Rails.application.queue
+ assert_kind_of Queue, Rails.queue
+ end
+
+ test "in development mode, an enqueued job will be processed in a separate thread" do
+ app("development")
+ current = Thread.current
+
+ job = Struct.new(:origin, :target).new(Thread.current)
+ def job.run
+ self.target = Thread.current
+ end
+
+ Rails.queue.push job
+ sleep 0.1
+
+ assert job.target, "The job was run"
+ assert_not_equal job.origin, job.target
+ end
+
+ test "in test mode, explicitly draining the queue will process it in a separate thread" do
+ app("test")
+ current = Thread.current
+
+ job = Struct.new(:origin, :target).new(Thread.current)
+ def job.run
+ self.target = Thread.current
+ end
+
+ Rails.queue.push job
+ Rails.queue.drain
+
+ assert job.target, "The job was run"
+ assert_not_equal job.origin, job.target
+ end
+
+ test "in test mode, the queue can be observed" do
+ app("test")
+
+ job = Class.new(Struct.new(:id)) do
+ def run
+ end
+ end
+
+ jobs = (1..10).map do |id|
+ job.new(id)
+ end
+
+ jobs.each do |job|
+ Rails.queue.push job
+ end
+
+ assert_equal jobs, Rails.queue.contents
+ end
+
+ test "a custom queue implementation can be provided" do
+ add_to_env_config "production", <<-RUBY
+ require "my_queue"
+ config.queue = MyQueue
+ RUBY
+
+ app_file "lib/my_queue.rb", <<-RUBY
+ class MyQueue
+ def push(job)
+ job.run
+ end
+ end
+ RUBY
+
+ app("production")
+
+ assert_kind_of MyQueue, Rails.queue
+
+ job = Class.new(Struct.new(:id, :ran)) do
+ def run
+ self.ran = true
+ end
+ end
+
+ job1 = job.new(1)
+ Rails.queue.push job1
+
+ assert_equal true, job1.ran
+ end
+ end
+end
diff --git a/railties/test/queueing/test_queue_test.rb b/railties/test/queueing/test_queue_test.rb
new file mode 100644
index 0000000000..de30e8cffd
--- /dev/null
+++ b/railties/test/queueing/test_queue_test.rb
@@ -0,0 +1,44 @@
+require 'abstract_unit'
+require 'rails/queueing'
+
+class TestQueueTest < ActiveSupport::TestCase
+ class Job
+ attr_reader :id
+ def initialize(id, &block)
+ @id = id
+ @block = block
+ end
+
+ def run
+ @block.call if @block
+ end
+ end
+
+ def setup
+ @queue = Rails::Queueing::TestQueue.new
+ end
+
+ def test_contents
+ assert_equal [], @queue.contents
+ job = Job.new(1)
+ @queue.push job
+ assert_equal [job], @queue.contents
+ end
+
+ def test_drain
+ t = nil
+ ran = false
+
+ job = Job.new(1) do
+ ran = true
+ t = Thread.current
+ end
+
+ @queue.push job
+ @queue.drain
+
+ assert_equal [], @queue.contents
+ assert ran, "The job runs synchronously when the queue is drained"
+ assert_not_equal t, Thread.current
+ end
+end
diff --git a/railties/test/queueing/threaded_consumer_test.rb b/railties/test/queueing/threaded_consumer_test.rb
new file mode 100644
index 0000000000..d00a67d511
--- /dev/null
+++ b/railties/test/queueing/threaded_consumer_test.rb
@@ -0,0 +1,65 @@
+require 'abstract_unit'
+require 'rails/queueing'
+
+class TestThreadConsumer < ActiveSupport::TestCase
+ class Job
+ attr_reader :id
+ def initialize(id, &block)
+ @id = id
+ @block = block
+ end
+
+ def run
+ @block.call if @block
+ end
+ end
+
+ def setup
+ @queue = Queue.new
+ @consumer = Rails::Queueing::ThreadedConsumer.start(@queue)
+ end
+
+ def teardown
+ @queue.push nil
+ end
+
+ test "the jobs are executed" do
+ ran = false
+
+ job = Job.new(1) do
+ ran = true
+ end
+
+ @queue.push job
+ sleep 0.1
+ assert_equal true, ran
+ end
+
+ test "the jobs are not executed synchronously" do
+ ran = false
+
+ job = Job.new(1) do
+ sleep 0.1
+ ran = true
+ end
+
+ @queue.push job
+ assert_equal false, ran
+ end
+
+ test "shutting down the queue synchronously drains the jobs" do
+ ran = false
+
+ job = Job.new(1) do
+ sleep 0.1
+ ran = true
+ end
+
+ @queue.push job
+ assert_equal false, ran
+
+ @consumer.shutdown
+
+ assert_equal true, ran
+ end
+end