diff options
author | Jeremy Kemper <jeremy@bitsweat.net> | 2012-12-21 15:42:47 -0800 |
---|---|---|
committer | Jeremy Kemper <jeremy@bitsweat.net> | 2012-12-21 16:29:47 -0800 |
commit | f9da785d0b1b22317cfca25c15fb555e9016accb (patch) | |
tree | f54e2174e9388ebb51bc32ed9ba1aa7cc7d95a32 /activesupport | |
parent | 10c0a3bd113c41f44fc025d2d042da95e9d8ea1f (diff) | |
download | rails-f9da785d0b1b22317cfca25c15fb555e9016accb.tar.gz rails-f9da785d0b1b22317cfca25c15fb555e9016accb.tar.bz2 rails-f9da785d0b1b22317cfca25c15fb555e9016accb.zip |
Move background jobs to the 'jobs' branch until fully baked. Not shipping with Rails 4.0.
Diffstat (limited to 'activesupport')
-rw-r--r-- | activesupport/lib/active_support/queueing.rb | 105 | ||||
-rw-r--r-- | activesupport/test/queueing/synchronous_queue_test.rb | 27 | ||||
-rw-r--r-- | activesupport/test/queueing/test_queue_test.rb | 146 | ||||
-rw-r--r-- | activesupport/test/queueing/threaded_consumer_test.rb | 110 |
4 files changed, 0 insertions, 388 deletions
diff --git a/activesupport/lib/active_support/queueing.rb b/activesupport/lib/active_support/queueing.rb deleted file mode 100644 index a89a48d057..0000000000 --- a/activesupport/lib/active_support/queueing.rb +++ /dev/null @@ -1,105 +0,0 @@ -require 'delegate' -require 'thread' - -module ActiveSupport - # A Queue that simply inherits from STDLIB's Queue. When this - # queue is used, Rails automatically starts a job runner in a - # background thread. - class Queue < ::Queue - attr_writer :consumer - - def initialize(consumer_options = {}) - super() - @consumer_options = consumer_options - end - - def consumer - @consumer ||= ThreadedQueueConsumer.new(self, @consumer_options) - end - - # Drain the queue, running all jobs in a different thread. This method - # may not be available on production queues. - def drain - # run the jobs in a separate thread so assumptions of synchronous - # jobs are caught in test mode. - consumer.drain - end - end - - class SynchronousQueue < Queue - def push(job) - super.tap { drain } - end - alias << push - alias enq push - end - - # In test mode, the Rails queue is backed by an Array so that assertions - # can be made about its contents. The test queue provides a +jobs+ - # method to make assertions about the queue's contents and a +drain+ - # method to drain the queue and run the jobs. - # - # Jobs are run in a separate thread to catch mistakes where code - # assumes that the job is run in the same thread. - class TestQueue < Queue - # Get a list of the jobs off this queue. This method may not be - # available on production queues. - def jobs - @que.dup - end - - # Marshal and unmarshal job before pushing it onto the queue. This will - # raise an exception on any attempts in tests to push jobs that can't (or - # shouldn't) be marshalled. - def push(job) - super Marshal.load(Marshal.dump(job)) - end - end - - # The threaded consumer will run jobs in a background thread in - # development mode or in a VM where running jobs on a thread in - # production mode makes sense. - # - # When the process exits, the consumer pushes a nil onto the - # queue and joins the thread, which will ensure that all jobs - # are executed before the process finally dies. - class ThreadedQueueConsumer - attr_accessor :logger - - def initialize(queue, options = {}) - @queue = queue - @logger = options[:logger] - @fallback_logger = Logger.new($stderr) - end - - def start - @thread = Thread.new { consume } - self - end - - def shutdown - @queue.push nil - @thread.join - end - - def drain - @queue.pop.run until @queue.empty? - end - - def consume - while job = @queue.pop - run job - end - end - - def run(job) - job.run - rescue Exception => exception - handle_exception job, exception - end - - def handle_exception(job, exception) - (logger || @fallback_logger).error "Job Error: #{job.inspect}\n#{exception.message}\n#{exception.backtrace.join("\n")}" - end - end -end diff --git a/activesupport/test/queueing/synchronous_queue_test.rb b/activesupport/test/queueing/synchronous_queue_test.rb deleted file mode 100644 index 86c39d0f6c..0000000000 --- a/activesupport/test/queueing/synchronous_queue_test.rb +++ /dev/null @@ -1,27 +0,0 @@ -require 'abstract_unit' -require 'active_support/queueing' - -class SynchronousQueueTest < ActiveSupport::TestCase - class Job - attr_reader :ran - def run; @ran = true end - end - - class ExceptionRaisingJob - def run; raise end - end - - def setup - @queue = ActiveSupport::SynchronousQueue.new - end - - def test_runs_jobs_immediately - job = Job.new - @queue.push job - assert job.ran - - assert_raises RuntimeError do - @queue.push ExceptionRaisingJob.new - end - end -end diff --git a/activesupport/test/queueing/test_queue_test.rb b/activesupport/test/queueing/test_queue_test.rb deleted file mode 100644 index 451fb68d3e..0000000000 --- a/activesupport/test/queueing/test_queue_test.rb +++ /dev/null @@ -1,146 +0,0 @@ -require 'abstract_unit' -require 'active_support/queueing' - -class TestQueueTest < ActiveSupport::TestCase - def setup - @queue = ActiveSupport::TestQueue.new - end - - class ExceptionRaisingJob - def run - raise - end - end - - def test_drain_raises_exceptions_from_running_jobs - @queue.push ExceptionRaisingJob.new - assert_raises(RuntimeError) { @queue.drain } - end - - def test_jobs - @queue.push 1 - @queue.push 2 - assert_equal [1,2], @queue.jobs - end - - class EquivalentJob - def initialize - @initial_id = self.object_id - end - - def run - end - - def ==(other) - other.same_initial_id?(@initial_id) - end - - def same_initial_id?(other_id) - other_id == @initial_id - end - end - - def test_contents - job = EquivalentJob.new - assert @queue.empty? - @queue.push job - refute @queue.empty? - assert_equal job, @queue.pop - end - - class ProcessingJob - def self.clear_processed - @processed = [] - end - - def self.processed - @processed - end - - def initialize(object) - @object = object - end - - def run - self.class.processed << @object - end - end - - def test_order - ProcessingJob.clear_processed - job1 = ProcessingJob.new(1) - job2 = ProcessingJob.new(2) - - @queue.push job1 - @queue.push job2 - @queue.drain - - assert_equal [1,2], ProcessingJob.processed - end - - class ThreadTrackingJob - attr_reader :thread_id - - def run - @thread_id = Thread.current.object_id - end - - def ran? - @thread_id - end - end - - def test_drain - @queue.push ThreadTrackingJob.new - job = @queue.jobs.last - @queue.drain - - assert @queue.empty? - assert job.ran?, "The job runs synchronously when the queue is drained" - assert_equal job.thread_id, Thread.current.object_id - end - - class IdentifiableJob - def initialize(id) - @id = id - end - - def ==(other) - other.same_id?(@id) - end - - def same_id?(other_id) - other_id == @id - end - - def run - end - end - - def test_queue_can_be_observed - jobs = (1..10).map do |id| - IdentifiableJob.new(id) - end - - jobs.each do |job| - @queue.push job - end - - assert_equal jobs, @queue.jobs - end - - def test_adding_an_unmarshallable_job - anonymous_class_instance = Struct.new(:run).new - - assert_raises TypeError do - @queue.push anonymous_class_instance - end - end - - def test_attempting_to_add_a_reference_to_itself - job = {reference: @queue} - assert_raises TypeError do - @queue.push job - end - end -end diff --git a/activesupport/test/queueing/threaded_consumer_test.rb b/activesupport/test/queueing/threaded_consumer_test.rb deleted file mode 100644 index a3ca46a261..0000000000 --- a/activesupport/test/queueing/threaded_consumer_test.rb +++ /dev/null @@ -1,110 +0,0 @@ -require 'abstract_unit' -require 'active_support/queueing' -require "active_support/log_subscriber/test_helper" - -class TestThreadConsumer < ActiveSupport::TestCase - class Job - attr_reader :id - def initialize(id = 1, &block) - @id = id - @block = block - end - - def run - @block.call if @block - end - end - - def setup - @logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new - @queue = ActiveSupport::Queue.new(logger: @logger) - end - - def teardown - @queue.drain - end - - test "the jobs are executed" do - ran = false - job = Job.new { ran = true } - - @queue.push job - @queue.drain - - assert_equal true, ran - end - - test "the jobs are not executed synchronously" do - run, ran = Queue.new, Queue.new - job = Job.new { ran.push run.pop } - - @queue.consumer.start - @queue.push job - assert ran.empty? - - run.push true - assert_equal true, ran.pop - end - - test "shutting down the queue synchronously drains the jobs" do - ran = false - job = Job.new do - sleep 0.1 - ran = true - end - - @queue.consumer.start - @queue.push job - assert_equal false, ran - - @queue.consumer.shutdown - assert_equal true, ran - end - - test "log job that raises an exception" do - job = Job.new { raise "RuntimeError: Error!" } - - @queue.push job - consume_queue @queue - - assert_equal 1, @logger.logged(:error).size - assert_match "Job Error: #{job.inspect}\nRuntimeError: Error!", @logger.logged(:error).last - end - - test "logger defaults to stderr" do - begin - $stderr, old_stderr = StringIO.new, $stderr - queue = ActiveSupport::Queue.new - queue.push Job.new { raise "RuntimeError: Error!" } - consume_queue queue - assert_match 'Job Error', $stderr.string - ensure - $stderr = old_stderr - end - end - - test "test overriding exception handling" do - @queue.consumer.instance_eval do - def handle_exception(job, exception) - @last_error = exception.message - end - - def last_error - @last_error - end - end - - job = Job.new { raise "RuntimeError: Error!" } - - @queue.push job - consume_queue @queue - - assert_equal "RuntimeError: Error!", @queue.consumer.last_error - end - - private - def consume_queue(queue) - queue.push nil - queue.consumer.consume - end -end |