aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/testing/parallelization.rb
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/lib/active_support/testing/parallelization.rb')
-rw-r--r--activesupport/lib/active_support/testing/parallelization.rb109
1 files changed, 109 insertions, 0 deletions
diff --git a/activesupport/lib/active_support/testing/parallelization.rb b/activesupport/lib/active_support/testing/parallelization.rb
new file mode 100644
index 0000000000..63440069b1
--- /dev/null
+++ b/activesupport/lib/active_support/testing/parallelization.rb
@@ -0,0 +1,109 @@
+# frozen_string_literal: true
+
+require "drb"
+require "drb/unix" unless Gem.win_platform?
+require "active_support/core_ext/module/attribute_accessors"
+
+module ActiveSupport
+ module Testing
+ class Parallelization # :nodoc:
+ class Server
+ include DRb::DRbUndumped
+
+ def initialize
+ @queue = Queue.new
+ end
+
+ def record(reporter, result)
+ raise DRb::DRbConnError if result.is_a?(DRb::DRbUnknown)
+
+ reporter.synchronize do
+ reporter.record(result)
+ end
+ end
+
+ def <<(o)
+ o[2] = DRbObject.new(o[2]) if o
+ @queue << o
+ end
+
+ def pop; @queue.pop; end
+ end
+
+ @@after_fork_hooks = []
+
+ def self.after_fork_hook(&blk)
+ @@after_fork_hooks << blk
+ end
+
+ cattr_reader :after_fork_hooks
+
+ @@run_cleanup_hooks = []
+
+ def self.run_cleanup_hook(&blk)
+ @@run_cleanup_hooks << blk
+ end
+
+ cattr_reader :run_cleanup_hooks
+
+ def initialize(queue_size)
+ @queue_size = queue_size
+ @queue = Server.new
+ @pool = []
+
+ @url = DRb.start_service("drbunix:", @queue).uri
+ end
+
+ def after_fork(worker)
+ self.class.after_fork_hooks.each do |cb|
+ cb.call(worker)
+ end
+ end
+
+ def run_cleanup(worker)
+ self.class.run_cleanup_hooks.each do |cb|
+ cb.call(worker)
+ end
+ end
+
+ def start
+ @pool = @queue_size.times.map do |worker|
+ fork do
+ DRb.stop_service
+
+ after_fork(worker)
+
+ queue = DRbObject.new_with_uri(@url)
+
+ while job = queue.pop
+ klass = job[0]
+ method = job[1]
+ reporter = job[2]
+ result = Minitest.run_one_method(klass, method)
+
+ begin
+ queue.record(reporter, result)
+ rescue DRb::DRbConnError
+ result.failures.each do |failure|
+ failure.exception = DRb::DRbRemoteError.new(failure.exception)
+ end
+ queue.record(reporter, result)
+ end
+ end
+ ensure
+ run_cleanup(worker)
+ end
+ end
+ end
+
+ def <<(work)
+ @queue << work
+ end
+
+ def shutdown
+ @queue_size.times { @queue << nil }
+ @pool.each { |pid| Process.waitpid pid }
+ end
+ end
+ end
+end