path: root/activesupport
diff options
authorJeremy Kemper <jeremy@bitsweat.net>2012-12-21 15:42:47 -0800
committerJeremy Kemper <jeremy@bitsweat.net>2012-12-21 16:29:47 -0800
commitf9da785d0b1b22317cfca25c15fb555e9016accb (patch)
treef54e2174e9388ebb51bc32ed9ba1aa7cc7d95a32 /activesupport
parent10c0a3bd113c41f44fc025d2d042da95e9d8ea1f (diff)
Move background jobs to the 'jobs' branch until fully baked. Not shipping with Rails 4.0.
Diffstat (limited to 'activesupport')
4 files changed, 0 insertions, 388 deletions
diff --git a/activesupport/lib/active_support/queueing.rb b/activesupport/lib/active_support/queueing.rb
deleted file mode 100644
index a89a48d057..0000000000
--- a/activesupport/lib/active_support/queueing.rb
+++ /dev/null
@@ -1,105 +0,0 @@
-require 'delegate'
-require 'thread'
-module ActiveSupport
- # A Queue that simply inherits from STDLIB's Queue. When this
- # queue is used, Rails automatically starts a job runner in a
- # background thread.
- class Queue < ::Queue
- attr_writer :consumer
- def initialize(consumer_options = {})
- super()
- @consumer_options = consumer_options
- end
- def consumer
- @consumer ||= ThreadedQueueConsumer.new(self, @consumer_options)
- end
- # Drain the queue, running all jobs in a different thread. This method
- # may not be available on production queues.
- def drain
- # run the jobs in a separate thread so assumptions of synchronous
- # jobs are caught in test mode.
- consumer.drain
- end
- end
- class SynchronousQueue < Queue
- def push(job)
- super.tap { drain }
- end
- alias << push
- alias enq push
- end
- # In test mode, the Rails queue is backed by an Array so that assertions
- # can be made about its contents. The test queue provides a +jobs+
- # method to make assertions about the queue's contents and a +drain+
- # method to drain the queue and run the jobs.
- #
- # Jobs are run in a separate thread to catch mistakes where code
- # assumes that the job is run in the same thread.
- class TestQueue < Queue
- # Get a list of the jobs off this queue. This method may not be
- # available on production queues.
- def jobs
- @que.dup
- end
- # Marshal and unmarshal job before pushing it onto the queue. This will
- # raise an exception on any attempts in tests to push jobs that can't (or
- # shouldn't) be marshalled.
- def push(job)
- super Marshal.load(Marshal.dump(job))
- end
- end
- # The threaded consumer will run jobs in a background thread in
- # development mode or in a VM where running jobs on a thread in
- # production mode makes sense.
- #
- # When the process exits, the consumer pushes a nil onto the
- # queue and joins the thread, which will ensure that all jobs
- # are executed before the process finally dies.
- class ThreadedQueueConsumer
- attr_accessor :logger
- def initialize(queue, options = {})
- @queue = queue
- @logger = options[:logger]
- @fallback_logger = Logger.new($stderr)
- end
- def start
- @thread = Thread.new { consume }
- self
- end
- def shutdown
- @queue.push nil
- @thread.join
- end
- def drain
- @queue.pop.run until @queue.empty?
- end
- def consume
- while job = @queue.pop
- run job
- end
- end
- def run(job)
- job.run
- rescue Exception => exception
- handle_exception job, exception
- end
- def handle_exception(job, exception)
- (logger || @fallback_logger).error "Job Error: #{job.inspect}\n#{exception.message}\n#{exception.backtrace.join("\n")}"
- end
- end
diff --git a/activesupport/test/queueing/synchronous_queue_test.rb b/activesupport/test/queueing/synchronous_queue_test.rb
deleted file mode 100644
index 86c39d0f6c..0000000000
--- a/activesupport/test/queueing/synchronous_queue_test.rb
+++ /dev/null
@@ -1,27 +0,0 @@
-require 'abstract_unit'
-require 'active_support/queueing'
-class SynchronousQueueTest < ActiveSupport::TestCase
- class Job
- attr_reader :ran
- def run; @ran = true end
- end
- class ExceptionRaisingJob
- def run; raise end
- end
- def setup
- @queue = ActiveSupport::SynchronousQueue.new
- end
- def test_runs_jobs_immediately
- job = Job.new
- @queue.push job
- assert job.ran
- assert_raises RuntimeError do
- @queue.push ExceptionRaisingJob.new
- end
- end
diff --git a/activesupport/test/queueing/test_queue_test.rb b/activesupport/test/queueing/test_queue_test.rb
deleted file mode 100644
index 451fb68d3e..0000000000
--- a/activesupport/test/queueing/test_queue_test.rb
+++ /dev/null
@@ -1,146 +0,0 @@
-require 'abstract_unit'
-require 'active_support/queueing'
-class TestQueueTest < ActiveSupport::TestCase
- def setup
- @queue = ActiveSupport::TestQueue.new
- end
- class ExceptionRaisingJob
- def run
- raise
- end
- end
- def test_drain_raises_exceptions_from_running_jobs
- @queue.push ExceptionRaisingJob.new
- assert_raises(RuntimeError) { @queue.drain }
- end
- def test_jobs
- @queue.push 1
- @queue.push 2
- assert_equal [1,2], @queue.jobs
- end
- class EquivalentJob
- def initialize
- @initial_id = self.object_id
- end
- def run
- end
- def ==(other)
- other.same_initial_id?(@initial_id)
- end
- def same_initial_id?(other_id)
- other_id == @initial_id
- end
- end
- def test_contents
- job = EquivalentJob.new
- assert @queue.empty?
- @queue.push job
- refute @queue.empty?
- assert_equal job, @queue.pop
- end
- class ProcessingJob
- def self.clear_processed
- @processed = []
- end
- def self.processed
- @processed
- end
- def initialize(object)
- @object = object
- end
- def run
- self.class.processed << @object
- end
- end
- def test_order
- ProcessingJob.clear_processed
- job1 = ProcessingJob.new(1)
- job2 = ProcessingJob.new(2)
- @queue.push job1
- @queue.push job2
- @queue.drain
- assert_equal [1,2], ProcessingJob.processed
- end
- class ThreadTrackingJob
- attr_reader :thread_id
- def run
- @thread_id = Thread.current.object_id
- end
- def ran?
- @thread_id
- end
- end
- def test_drain
- @queue.push ThreadTrackingJob.new
- job = @queue.jobs.last
- @queue.drain
- assert @queue.empty?
- assert job.ran?, "The job runs synchronously when the queue is drained"
- assert_equal job.thread_id, Thread.current.object_id
- end
- class IdentifiableJob
- def initialize(id)
- @id = id
- end
- def ==(other)
- other.same_id?(@id)
- end
- def same_id?(other_id)
- other_id == @id
- end
- def run
- end
- end
- def test_queue_can_be_observed
- jobs = (1..10).map do |id|
- IdentifiableJob.new(id)
- end
- jobs.each do |job|
- @queue.push job
- end
- assert_equal jobs, @queue.jobs
- end
- def test_adding_an_unmarshallable_job
- anonymous_class_instance = Struct.new(:run).new
- assert_raises TypeError do
- @queue.push anonymous_class_instance
- end
- end
- def test_attempting_to_add_a_reference_to_itself
- job = {reference: @queue}
- assert_raises TypeError do
- @queue.push job
- end
- end
diff --git a/activesupport/test/queueing/threaded_consumer_test.rb b/activesupport/test/queueing/threaded_consumer_test.rb
deleted file mode 100644
index a3ca46a261..0000000000
--- a/activesupport/test/queueing/threaded_consumer_test.rb
+++ /dev/null
@@ -1,110 +0,0 @@
-require 'abstract_unit'
-require 'active_support/queueing'
-require "active_support/log_subscriber/test_helper"
-class TestThreadConsumer < ActiveSupport::TestCase
- class Job
- attr_reader :id
- def initialize(id = 1, &block)
- @id = id
- @block = block
- end
- def run
- @block.call if @block
- end
- end
- def setup
- @logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
- @queue = ActiveSupport::Queue.new(logger: @logger)
- end
- def teardown
- @queue.drain
- end
- test "the jobs are executed" do
- ran = false
- job = Job.new { ran = true }
- @queue.push job
- @queue.drain
- assert_equal true, ran
- end
- test "the jobs are not executed synchronously" do
- run, ran = Queue.new, Queue.new
- job = Job.new { ran.push run.pop }
- @queue.consumer.start
- @queue.push job
- assert ran.empty?
- run.push true
- assert_equal true, ran.pop
- end
- test "shutting down the queue synchronously drains the jobs" do
- ran = false
- job = Job.new do
- sleep 0.1
- ran = true
- end
- @queue.consumer.start
- @queue.push job
- assert_equal false, ran
- @queue.consumer.shutdown
- assert_equal true, ran
- end
- test "log job that raises an exception" do
- job = Job.new { raise "RuntimeError: Error!" }
- @queue.push job
- consume_queue @queue
- assert_equal 1, @logger.logged(:error).size
- assert_match "Job Error: #{job.inspect}\nRuntimeError: Error!", @logger.logged(:error).last
- end
- test "logger defaults to stderr" do
- begin
- $stderr, old_stderr = StringIO.new, $stderr
- queue = ActiveSupport::Queue.new
- queue.push Job.new { raise "RuntimeError: Error!" }
- consume_queue queue
- assert_match 'Job Error', $stderr.string
- ensure
- $stderr = old_stderr
- end
- end
- test "test overriding exception handling" do
- @queue.consumer.instance_eval do
- def handle_exception(job, exception)
- @last_error = exception.message
- end
- def last_error
- @last_error
- end
- end
- job = Job.new { raise "RuntimeError: Error!" }
- @queue.push job
- consume_queue @queue
- assert_equal "RuntimeError: Error!", @queue.consumer.last_error
- end
- private
- def consume_queue(queue)
- queue.push nil
- queue.consumer.consume
- end