blob: 96518a4a582e353683188d5bae0f4389882a6a26 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# frozen_string_literal: true
require "drb"
require "drb/unix" unless Gem.win_platform?
require "active_support/core_ext/module/attribute_accessors"
module ActiveSupport
module Testing
class Parallelization # :nodoc:
class Server
include DRb::DRbUndumped
def initialize
@queue = Queue.new
end
def record(reporter, result)
raise DRb::DRbConnError if result.is_a?(DRb::DRbUnknown)
reporter.synchronize do
reporter.record(result)
end
end
def <<(o)
o[2] = DRbObject.new(o[2]) if o
@queue << o
end
def length
@queue.length
end
def pop; @queue.pop; end
end
@@after_fork_hooks = []
def self.after_fork_hook(&blk)
@@after_fork_hooks << blk
end
cattr_reader :after_fork_hooks
@@run_cleanup_hooks = []
def self.run_cleanup_hook(&blk)
@@run_cleanup_hooks << blk
end
cattr_reader :run_cleanup_hooks
def initialize(queue_size)
@queue_size = queue_size
@queue = Server.new
@pool = []
@url = DRb.start_service("drbunix:", @queue).uri
end
def after_fork(worker)
self.class.after_fork_hooks.each do |cb|
cb.call(worker)
end
end
def run_cleanup(worker)
self.class.run_cleanup_hooks.each do |cb|
cb.call(worker)
end
end
def start
@pool = @queue_size.times.map do |worker|
title = "Rails test worker #{worker}"
fork do
Process.setproctitle("#{title} - (starting)")
DRb.stop_service
begin
after_fork(worker)
rescue => setup_exception; end
queue = DRbObject.new_with_uri(@url)
while job = queue.pop
klass = job[0]
method = job[1]
reporter = job[2]
Process.setproctitle("#{title} - #{klass}##{method}")
result = klass.with_info_handler reporter do
Minitest.run_one_method(klass, method)
end
add_setup_exception(result, setup_exception) if setup_exception
begin
queue.record(reporter, result)
rescue DRb::DRbConnError
result.failures.each do |failure|
failure.exception = DRb::DRbRemoteError.new(failure.exception)
end
queue.record(reporter, result)
end
Process.setproctitle("#{title} - (idle)")
end
ensure
Process.setproctitle("#{title} - (stopping)")
run_cleanup(worker)
end
end
end
def <<(work)
@queue << work
end
def shutdown
@queue_size.times { @queue << nil }
@pool.each { |pid| Process.waitpid pid }
if @queue.length > 0
raise "Queue not empty, but all workers have finished. This probably means that a worker crashed and #{@queue.length} tests were missed."
end
end
private
def add_setup_exception(result, setup_exception)
result.failures.prepend Minitest::UnexpectedError.new(setup_exception)
end
end
end
end
|