blob: 59c8486f41c0517812dfc2febe2cbd60361cb82a (
plain) (
tree)
|
|
# frozen_string_literal: true
require "drb"
require "drb/unix"
module ActiveSupport
module Testing
class Parallelization # :nodoc:
class Server
include DRb::DRbUndumped
def initialize
@queue = Queue.new
end
def record(reporter, result)
reporter.synchronize do
reporter.record(result)
end
end
def <<(o)
@queue << o
end
def pop; @queue.pop; end
end
@after_fork_hooks = []
def self.after_fork_hook(&blk)
@after_fork_hooks << blk
end
def self.after_fork_hooks
@after_fork_hooks
end
@run_cleanup_hooks = []
def self.run_cleanup_hook(&blk)
@run_cleanup_hooks << blk
end
def self.run_cleanup_hooks
@run_cleanup_hooks
end
def initialize(queue_size)
@queue_size = queue_size
@queue = Server.new
@pool = []
@url = DRb.start_service("drbunix:", @queue).uri
end
def after_fork(worker)
self.class.after_fork_hooks.each do |cb|
cb.call(worker)
end
end
def run_cleanup(worker)
self.class.run_cleanup_hooks.each do |cb|
cb.call(worker)
end
end
def start
@pool = @queue_size.times.map do |worker|
fork do
DRb.stop_service
after_fork(worker)
queue = DRbObject.new_with_uri(@url)
while job = queue.pop
klass = job[0]
method = job[1]
reporter = job[2]
result = Minitest.run_one_method(klass, method)
queue.record(reporter, result)
end
run_cleanup(worker)
end
end
end
def <<(work)
@queue << work
end
def shutdown
@queue_size.times { @queue << nil }
@pool.each { |pid| Process.waitpid pid }
end
end
end
end
|