aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord/test/cases/connection_pool_test.rb
blob: 2bdabaee62bd81326334d1bdf4b08fb43c50cb68 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
require "cases/helper"
require 'concurrent/atomic/count_down_latch'

module ActiveRecord
  module ConnectionAdapters
    class ConnectionPoolTest < ActiveRecord::TestCase
      attr_reader :pool

      def setup
        super

        # Keep a duplicate pool so we do not bother others
        @pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec

        if in_memory_db?
          # Separate connections to an in-memory database create an entirely new database,
          # with an empty schema etc, so we just stub out this schema on the fly.
          @pool.with_connection do |connection|
            connection.create_table :posts do |t|
              t.integer :cololumn
            end
          end
        end
      end

      teardown do
        @pool.disconnect!
      end

      def active_connections(pool)
        pool.connections.find_all(&:in_use?)
      end

      def test_checkout_after_close
        connection = pool.connection
        assert connection.in_use?

        connection.close
        assert !connection.in_use?

        assert pool.connection.in_use?
      end

      def test_released_connection_moves_between_threads
        thread_conn = nil

        Thread.new {
          pool.with_connection do |conn|
            thread_conn = conn
          end
        }.join

        assert thread_conn

        Thread.new {
          pool.with_connection do |conn|
            assert_equal thread_conn, conn
          end
        }.join
      end

      def test_with_connection
        assert_equal 0, active_connections(pool).size

        main_thread = pool.connection
        assert_equal 1, active_connections(pool).size

        Thread.new {
          pool.with_connection do |conn|
            assert conn
            assert_equal 2, active_connections(pool).size
          end
          assert_equal 1, active_connections(pool).size
        }.join

        main_thread.close
        assert_equal 0, active_connections(pool).size
      end

      def test_active_connection_in_use
        assert !pool.active_connection?
        main_thread = pool.connection

        assert pool.active_connection?

        main_thread.close

        assert !pool.active_connection?
      end

      def test_full_pool_exception
        @pool.size.times { @pool.checkout }
        assert_raises(ConnectionTimeoutError) do
          @pool.checkout
        end
      end

      def test_full_pool_blocks
        cs = @pool.size.times.map { @pool.checkout }
        t = Thread.new { @pool.checkout }

        # make sure our thread is in the timeout section
        Thread.pass until @pool.num_waiting_in_queue == 1

        connection = cs.first
        connection.close
        assert_equal connection, t.join.value
      end

      def test_removing_releases_latch
        cs = @pool.size.times.map { @pool.checkout }
        t = Thread.new { @pool.checkout }

        # make sure our thread is in the timeout section
        Thread.pass until @pool.num_waiting_in_queue == 1

        connection = cs.first
        @pool.remove connection
        assert_respond_to t.join.value, :execute
        connection.close
      end

      def test_reap_and_active
        @pool.checkout
        @pool.checkout
        @pool.checkout

        connections = @pool.connections.dup

        @pool.reap

        assert_equal connections.length, @pool.connections.length
      end

      def test_reap_inactive
        ready = Concurrent::CountDownLatch.new
        @pool.checkout
        child = Thread.new do
          @pool.checkout
          @pool.checkout
          ready.count_down
          Thread.stop
        end
        ready.wait

        assert_equal 3, active_connections(@pool).size

        child.terminate
        child.join
        @pool.reap

        assert_equal 1, active_connections(@pool).size
      ensure
        @pool.connections.each { |conn| conn.close if conn.in_use? }
      end

      def test_remove_connection
        conn = @pool.checkout
        assert conn.in_use?

        length = @pool.connections.length
        @pool.remove conn
        assert conn.in_use?
        assert_equal(length - 1, @pool.connections.length)
      ensure
        conn.close
      end

      def test_remove_connection_for_thread
        conn = @pool.connection
        @pool.remove conn
        assert_not_equal(conn, @pool.connection)
      ensure
        conn.close if conn
      end

      def test_active_connection?
        assert !@pool.active_connection?
        assert @pool.connection
        assert @pool.active_connection?
        @pool.release_connection
        assert !@pool.active_connection?
      end

      def test_checkout_behaviour
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        connection = pool.connection
        assert_not_nil connection
        threads = []
        4.times do |i|
          threads << Thread.new(i) do
            connection = pool.connection
            assert_not_nil connection
            connection.close
          end
        end

        threads.each(&:join)

        Thread.new do
          assert pool.connection
          pool.connection.close
        end.join
      end

      # The connection pool is "fair" if threads waiting for
      # connections receive them in the order in which they began
      # waiting.  This ensures that we don't timeout one HTTP request
      # even while well under capacity in a multi-threaded environment
      # such as a Java servlet container.
      #
      # We don't need strict fairness: if two connections become
      # available at the same time, it's fine if two threads that were
      # waiting acquire the connections out of order.
      #
      # Thus this test prepares waiting threads and then trickles in
      # available connections slowly, ensuring the wakeup order is
      # correct in this case.
      def test_checkout_fairness
        @pool.instance_variable_set(:@size, 10)
        expected = (1..@pool.size).to_a.freeze
        # check out all connections so our threads start out waiting
        conns = expected.map { @pool.checkout }
        mutex = Mutex.new
        order = []
        errors = []

        threads = expected.map do |i|
          t = Thread.new {
            begin
              @pool.checkout # never checked back in
              mutex.synchronize { order << i }
            rescue => e
              mutex.synchronize { errors << e }
            end
          }
          Thread.pass until @pool.num_waiting_in_queue == i
          t
        end

        # this should wake up the waiting threads one by one in order
        conns.each { |conn| @pool.checkin(conn); sleep 0.1 }

        threads.each(&:join)

        raise errors.first if errors.any?

        assert_equal(expected, order)
      end

      # As mentioned in #test_checkout_fairness, we don't care about
      # strict fairness.  This test creates two groups of threads:
      # group1 whose members all start waiting before any thread in
      # group2.  Enough connections are checked in to wakeup all
      # group1 threads, and the fact that only group1 and no group2
      # threads acquired a connection is enforced.
      def test_checkout_fairness_by_group
        @pool.instance_variable_set(:@size, 10)
        # take all the connections
        conns = (1..10).map { @pool.checkout }
        mutex = Mutex.new
        successes = []    # threads that successfully got a connection
        errors = []

        make_thread = proc do |i|
          t = Thread.new {
            begin
              @pool.checkout # never checked back in
              mutex.synchronize { successes << i }
            rescue => e
              mutex.synchronize { errors << e }
            end
          }
          Thread.pass until @pool.num_waiting_in_queue == i
          t
        end

        # all group1 threads start waiting before any in group2
        group1 = (1..5).map(&make_thread)
        group2 = (6..10).map(&make_thread)

        # checkin n connections back to the pool
        checkin = proc do |n|
          n.times do
            c = conns.pop
            @pool.checkin(c)
          end
        end

        checkin.call(group1.size)         # should wake up all group1

        loop do
          sleep 0.1
          break if mutex.synchronize { (successes.size + errors.size) == group1.size }
        end

        winners = mutex.synchronize { successes.dup }
        checkin.call(group2.size)         # should wake up everyone remaining

        group1.each(&:join)
        group2.each(&:join)

        assert_equal((1..group1.size).to_a, winners.sort)

        if errors.any?
          raise errors.first
        end
      end

      def test_automatic_reconnect=
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        assert pool.automatic_reconnect
        assert pool.connection

        pool.disconnect!
        assert pool.connection

        pool.disconnect!
        pool.automatic_reconnect = false

        assert_raises(ConnectionNotEstablished) do
          pool.connection
        end

        assert_raises(ConnectionNotEstablished) do
          pool.with_connection
        end
      end

      def test_pool_sets_connection_visitor
        assert @pool.connection.visitor.is_a?(Arel::Visitors::ToSql)
      end

      # make sure exceptions are thrown when establish_connection
      # is called with an anonymous class
      def test_anonymous_class_exception
        anonymous = Class.new(ActiveRecord::Base)

        assert_raises(RuntimeError) do
          anonymous.establish_connection
        end
      end

      def test_connection_notification_is_called
        payloads = []
        subscription = ActiveSupport::Notifications.subscribe('!connection.active_record') do |name, started, finished, unique_id, payload|
          payloads << payload
        end
        ActiveRecord::Base.establish_connection :arunit
        assert_equal [:class_name, :config, :connection_id], payloads[0].keys.sort
        assert_equal 'primary', payloads[0][:class_name]
      ensure
        ActiveSupport::Notifications.unsubscribe(subscription) if subscription
      end

      def test_pool_sets_connection_schema_cache
        connection = pool.checkout
        schema_cache = SchemaCache.new connection
        schema_cache.add(:posts)
        pool.schema_cache = schema_cache

        pool.with_connection do |conn|
          assert_not_same pool.schema_cache, conn.schema_cache
          assert_equal pool.schema_cache.size, conn.schema_cache.size
          assert_same pool.schema_cache.columns(:posts), conn.schema_cache.columns(:posts)
        end

        pool.checkin connection
      end

      def test_concurrent_connection_establishment
        assert_operator @pool.connections.size, :<=, 1

        all_threads_in_new_connection = Concurrent::CountDownLatch.new(@pool.size - @pool.connections.size)
        all_go                        = Concurrent::CountDownLatch.new

        @pool.singleton_class.class_eval do
          define_method(:new_connection) do
            all_threads_in_new_connection.count_down
            all_go.wait
            super()
          end
        end

        connecting_threads = []
        @pool.size.times do
          connecting_threads << Thread.new { @pool.checkout }
        end

        begin
          Timeout.timeout(5) do
            # the kernel of the whole test is here, everything else is just scaffolding,
            # this latch will not be released unless conn. pool allows for concurrent
            # connection creation
            all_threads_in_new_connection.wait
          end
        rescue Timeout::Error
          flunk 'pool unable to establish connections concurrently or implementation has ' <<
                'changed, this test then needs to patch a different :new_connection method'
        ensure
          # clean up the threads
          all_go.count_down
          connecting_threads.map(&:join)
        end
      end

      def test_non_bang_disconnect_and_clear_reloadable_connections_throw_exception_if_threads_dont_return_their_conns
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        [:disconnect, :clear_reloadable_connections].each do |group_action_method|
          @pool.with_connection do |connection|
            assert_raises(ExclusiveConnectionTimeoutError) do
              Thread.new { @pool.send(group_action_method) }.join
            end
          end
        end
      end

      def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns
        [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
          begin
            thread = timed_join_result = nil
            @pool.with_connection do |connection|
              thread = Thread.new { @pool.send(group_action_method) }

              # give the other `thread` some time to get stuck in `group_action_method`
              timed_join_result = thread.join(0.3)
              # thread.join # => `nil` means the other thread hasn't finished running and is still waiting for us to
              # release our connection
              assert_nil timed_join_result

              # assert that since this is within default timeout our connection hasn't been forcefully taken away from us
              assert @pool.active_connection?
            end
          ensure
            thread.join if thread && !timed_join_result # clean up the other thread
          end
        end
      end

      def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_aquire_all_connections_proceed_anyway
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        [:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
          @pool.with_connection do |connection|
            Thread.new { @pool.send(group_action_method) }.join
            # assert connection has been forcefully taken away from us
            assert_not @pool.active_connection?

            # make a new connection for with_connection to clean up
            @pool.connection
          end
        end
      end

      def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads
        with_single_connection_pool do |pool|
          [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
            conn               = pool.connection # drain the only available connection
            second_thread_done = Concurrent::CountDownLatch.new

            # create a first_thread and let it get into the FIFO queue first
            first_thread = Thread.new do
              pool.with_connection { second_thread_done.wait }
            end

            # wait for first_thread to get in queue
            Thread.pass until pool.num_waiting_in_queue == 1

            # create a different, later thread, that will attempt to do a "group action",
            # but because of the group action semantics it should be able to preempt the
            # first_thread when a connection is made available
            second_thread = Thread.new do
              pool.send(group_action_method)
              second_thread_done.count_down
            end

            # wait for second_thread to get in queue
            Thread.pass until pool.num_waiting_in_queue == 2

            # return the only available connection
            pool.checkin(conn)

            # if the second_thread is not able to preempt the first_thread,
            # they will temporarily (until either of them timeouts with ConnectionTimeoutError)
            # deadlock and a join(2) timeout will be reached
            failed = true unless second_thread.join(2)

            #--- post test clean up start
            second_thread_done.count_down if failed

            # after `pool.disconnect()` the first thread will be left stuck in queue, no need to wait for
            # it to timeout with ConnectionTimeoutError
            if (group_action_method == :disconnect || group_action_method == :disconnect!) && pool.num_waiting_in_queue > 0
              pool.with_connection {} # create a new connection in case there are threads still stuck in a queue
            end

            first_thread.join
            second_thread.join
            #--- post test clean up end

            flunk "#{group_action_method} is not able to preempt other waiting threads" if failed
          end
        end
      end

      def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary
        with_single_connection_pool do |pool|
          conn = pool.connection # drain the only available connection
          def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections
            true
          end

          stuck_thread = Thread.new do
            pool.with_connection {}
          end

          # wait for stuck_thread to get in queue
          Thread.pass until pool.num_waiting_in_queue == 1

          pool.clear_reloadable_connections

          unless stuck_thread.join(2)
            flunk 'clear_reloadable_connections must not let other connection waiting threads get stuck in queue'
          end

          assert_equal 0, pool.num_waiting_in_queue
        end
      end

      private
      def with_single_connection_pool
        one_conn_spec = ActiveRecord::Base.connection_pool.spec.dup
        one_conn_spec.config[:pool] = 1 # this is safe to do, because .dupped ConnectionSpecification also auto-dups its config
        yield(pool = ConnectionPool.new(one_conn_spec))
      ensure
        pool.disconnect! if pool
      end
    end
  end
end