aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/testing/parallelization.rb
blob: 96518a4a582e353683188d5bae0f4389882a6a26 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# frozen_string_literal: true

require "drb"
require "drb/unix" unless Gem.win_platform?
require "active_support/core_ext/module/attribute_accessors"

module ActiveSupport
  module Testing
    class Parallelization # :nodoc:
      class Server
        include DRb::DRbUndumped

        def initialize
          @queue = Queue.new
        end

        def record(reporter, result)
          raise DRb::DRbConnError if result.is_a?(DRb::DRbUnknown)

          reporter.synchronize do
            reporter.record(result)
          end
        end

        def <<(o)
          o[2] = DRbObject.new(o[2]) if o
          @queue << o
        end

        def length
          @queue.length
        end

        def pop; @queue.pop; end
      end

      @@after_fork_hooks = []

      def self.after_fork_hook(&blk)
        @@after_fork_hooks << blk
      end

      cattr_reader :after_fork_hooks

      @@run_cleanup_hooks = []

      def self.run_cleanup_hook(&blk)
        @@run_cleanup_hooks << blk
      end

      cattr_reader :run_cleanup_hooks

      def initialize(queue_size)
        @queue_size = queue_size
        @queue      = Server.new
        @pool       = []

        @url = DRb.start_service("drbunix:", @queue).uri
      end

      def after_fork(worker)
        self.class.after_fork_hooks.each do |cb|
          cb.call(worker)
        end
      end

      def run_cleanup(worker)
        self.class.run_cleanup_hooks.each do |cb|
          cb.call(worker)
        end
      end

      def start
        @pool = @queue_size.times.map do |worker|
          title = "Rails test worker #{worker}"

          fork do
            Process.setproctitle("#{title} - (starting)")

            DRb.stop_service

            begin
              after_fork(worker)
            rescue => setup_exception; end

            queue = DRbObject.new_with_uri(@url)

            while job = queue.pop
              klass    = job[0]
              method   = job[1]
              reporter = job[2]

              Process.setproctitle("#{title} - #{klass}##{method}")

              result = klass.with_info_handler reporter do
                Minitest.run_one_method(klass, method)
              end

              add_setup_exception(result, setup_exception) if setup_exception

              begin
                queue.record(reporter, result)
              rescue DRb::DRbConnError
                result.failures.each do |failure|
                  failure.exception = DRb::DRbRemoteError.new(failure.exception)
                end
                queue.record(reporter, result)
              end

              Process.setproctitle("#{title} - (idle)")
            end
          ensure
            Process.setproctitle("#{title} - (stopping)")

            run_cleanup(worker)
          end
        end
      end

      def <<(work)
        @queue << work
      end

      def shutdown
        @queue_size.times { @queue << nil }
        @pool.each { |pid| Process.waitpid pid }

        if @queue.length > 0
          raise "Queue not empty, but all workers have finished. This probably means that a worker crashed and #{@queue.length} tests were missed."
        end
      end

      private
        def add_setup_exception(result, setup_exception)
          result.failures.prepend Minitest::UnexpectedError.new(setup_exception)
        end
    end
  end
end