aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord/test/cases/connection_pool_test.rb
blob: ccbb6e16cdcc63a4171bf48612707bee7b557c91 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
# frozen_string_literal: true

require "cases/helper"
require "concurrent/atomic/count_down_latch"

module ActiveRecord
  module ConnectionAdapters
    class ConnectionPoolTest < ActiveRecord::TestCase
      attr_reader :pool

      def setup
        super

        # Keep a duplicate pool so we do not bother others
        @pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec

        if in_memory_db?
          # Separate connections to an in-memory database create an entirely new database,
          # with an empty schema etc, so we just stub out this schema on the fly.
          @pool.with_connection do |connection|
            connection.create_table :posts do |t|
              t.integer :cololumn
            end
          end
        end
      end

      teardown do
        @pool.disconnect!
      end

      def active_connections(pool)
        pool.connections.find_all(&:in_use?)
      end

      def test_checkout_after_close
        connection = pool.connection
        assert_predicate connection, :in_use?

        connection.close
        assert_not_predicate connection, :in_use?

        assert_predicate pool.connection, :in_use?
      end

      def test_released_connection_moves_between_threads
        thread_conn = nil

        Thread.new {
          pool.with_connection do |conn|
            thread_conn = conn
          end
        }.join

        assert thread_conn

        Thread.new {
          pool.with_connection do |conn|
            assert_equal thread_conn, conn
          end
        }.join
      end

      def test_with_connection
        assert_equal 0, active_connections(pool).size

        main_thread = pool.connection
        assert_equal 1, active_connections(pool).size

        Thread.new {
          pool.with_connection do |conn|
            assert conn
            assert_equal 2, active_connections(pool).size
          end
          assert_equal 1, active_connections(pool).size
        }.join

        main_thread.close
        assert_equal 0, active_connections(pool).size
      end

      def test_active_connection_in_use
        assert_not_predicate pool, :active_connection?
        main_thread = pool.connection

        assert_predicate pool, :active_connection?

        main_thread.close

        assert_not_predicate pool, :active_connection?
      end

      def test_full_pool_exception
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        @pool.size.times { assert @pool.checkout }

        assert_raises(ConnectionTimeoutError) do
          @pool.checkout
        end
      end

      def test_full_pool_blocks
        cs = @pool.size.times.map { @pool.checkout }
        t = Thread.new { @pool.checkout }

        # make sure our thread is in the timeout section
        Thread.pass until @pool.num_waiting_in_queue == 1

        connection = cs.first
        connection.close
        assert_equal connection, t.join.value
      end

      def test_full_pool_blocking_shares_load_interlock
        @pool.instance_variable_set(:@size, 1)

        load_interlock_latch = Concurrent::CountDownLatch.new
        connection_latch = Concurrent::CountDownLatch.new

        able_to_get_connection = false
        able_to_load = false

        thread_with_load_interlock = Thread.new do
          ActiveSupport::Dependencies.interlock.running do
            load_interlock_latch.count_down
            connection_latch.wait

            @pool.with_connection do
              able_to_get_connection = true
            end
          end
        end

        thread_with_last_connection = Thread.new do
          @pool.with_connection do
            connection_latch.count_down
            load_interlock_latch.wait

            ActiveSupport::Dependencies.interlock.loading do
              able_to_load = true
            end
          end
        end

        thread_with_load_interlock.join
        thread_with_last_connection.join

        assert able_to_get_connection
        assert able_to_load
      end

      def test_removing_releases_latch
        cs = @pool.size.times.map { @pool.checkout }
        t = Thread.new { @pool.checkout }

        # make sure our thread is in the timeout section
        Thread.pass until @pool.num_waiting_in_queue == 1

        connection = cs.first
        @pool.remove connection
        assert_respond_to t.join.value, :execute
        connection.close
      end

      def test_reap_and_active
        @pool.checkout
        @pool.checkout
        @pool.checkout

        connections = @pool.connections.dup

        @pool.reap

        assert_equal connections.length, @pool.connections.length
      end

      def test_reap_inactive
        ready = Concurrent::CountDownLatch.new
        @pool.checkout
        child = Thread.new do
          @pool.checkout
          @pool.checkout
          ready.count_down
          Thread.stop
        end
        ready.wait

        assert_equal 3, active_connections(@pool).size

        child.terminate
        child.join
        @pool.reap

        assert_equal 1, active_connections(@pool).size
      ensure
        @pool.connections.each { |conn| conn.close if conn.in_use? }
      end

      def test_idle_timeout_configuration
        @pool.disconnect!
        spec = ActiveRecord::Base.connection_pool.spec
        spec.config.merge!(idle_timeout: "0.02")
        @pool = ConnectionPool.new(spec)
        idle_conn = @pool.checkout
        @pool.checkin(idle_conn)

        idle_conn.instance_variable_set(
          :@idle_since,
          Concurrent.monotonic_time - 0.01
        )

        @pool.flush
        assert_equal 1, @pool.connections.length

        idle_conn.instance_variable_set(
          :@idle_since,
          Concurrent.monotonic_time - 0.02
        )

        @pool.flush
        assert_equal 0, @pool.connections.length
      end

      def test_disable_flush
        @pool.disconnect!
        spec = ActiveRecord::Base.connection_pool.spec
        spec.config.merge!(idle_timeout: -5)
        @pool = ConnectionPool.new(spec)
        idle_conn = @pool.checkout
        @pool.checkin(idle_conn)

        idle_conn.instance_variable_set(
          :@idle_since,
          Concurrent.monotonic_time - 1
        )

        @pool.flush
        assert_equal 1, @pool.connections.length
      end

      def test_flush
        idle_conn = @pool.checkout
        recent_conn = @pool.checkout
        active_conn = @pool.checkout

        @pool.checkin idle_conn
        @pool.checkin recent_conn

        assert_equal 3, @pool.connections.length

        idle_conn.instance_variable_set(
          :@idle_since,
          Concurrent.monotonic_time - 1000
        )

        @pool.flush(30)

        assert_equal 2, @pool.connections.length

        assert_equal [recent_conn, active_conn].sort_by(&:__id__), @pool.connections.sort_by(&:__id__)
      ensure
        @pool.checkin active_conn
      end

      def test_flush_bang
        idle_conn = @pool.checkout
        recent_conn = @pool.checkout
        active_conn = @pool.checkout
        _dead_conn = Thread.new { @pool.checkout }.join

        @pool.checkin idle_conn
        @pool.checkin recent_conn

        assert_equal 4, @pool.connections.length

        def idle_conn.seconds_idle
          1000
        end

        @pool.flush!

        assert_equal 1, @pool.connections.length

        assert_equal [active_conn].sort_by(&:__id__), @pool.connections.sort_by(&:__id__)
      ensure
        @pool.checkin active_conn
      end

      def test_remove_connection
        conn = @pool.checkout
        assert_predicate conn, :in_use?

        length = @pool.connections.length
        @pool.remove conn
        assert_predicate conn, :in_use?
        assert_equal(length - 1, @pool.connections.length)
      ensure
        conn.close
      end

      def test_remove_connection_for_thread
        conn = @pool.connection
        @pool.remove conn
        assert_not_equal(conn, @pool.connection)
      ensure
        conn.close if conn
      end

      def test_active_connection?
        assert_not_predicate @pool, :active_connection?
        assert @pool.connection
        assert_predicate @pool, :active_connection?
        @pool.release_connection
        assert_not_predicate @pool, :active_connection?
      end

      def test_checkout_behaviour
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        main_connection = pool.connection
        assert_not_nil main_connection
        threads = []
        4.times do |i|
          threads << Thread.new(i) do
            thread_connection = pool.connection
            assert_not_nil thread_connection
            thread_connection.close
          end
        end

        threads.each(&:join)

        Thread.new do
          assert pool.connection
          pool.connection.close
        end.join
      end

      def test_checkout_order_is_lifo
        conn1 = @pool.checkout
        conn2 = @pool.checkout
        @pool.checkin conn1
        @pool.checkin conn2
        assert_equal [conn2, conn1], 2.times.map { @pool.checkout }
      end

      # The connection pool is "fair" if threads waiting for
      # connections receive them in the order in which they began
      # waiting.  This ensures that we don't timeout one HTTP request
      # even while well under capacity in a multi-threaded environment
      # such as a Java servlet container.
      #
      # We don't need strict fairness: if two connections become
      # available at the same time, it's fine if two threads that were
      # waiting acquire the connections out of order.
      #
      # Thus this test prepares waiting threads and then trickles in
      # available connections slowly, ensuring the wakeup order is
      # correct in this case.
      def test_checkout_fairness
        @pool.instance_variable_set(:@size, 10)
        expected = (1..@pool.size).to_a.freeze
        # check out all connections so our threads start out waiting
        conns = expected.map { @pool.checkout }
        mutex = Mutex.new
        order = []
        errors = []

        threads = expected.map do |i|
          t = Thread.new {
            begin
              @pool.checkout # never checked back in
              mutex.synchronize { order << i }
            rescue => e
              mutex.synchronize { errors << e }
            end
          }
          Thread.pass until @pool.num_waiting_in_queue == i
          t
        end

        # this should wake up the waiting threads one by one in order
        conns.each { |conn| @pool.checkin(conn); sleep 0.1 }

        threads.each(&:join)

        raise errors.first if errors.any?

        assert_equal(expected, order)
      end

      # As mentioned in #test_checkout_fairness, we don't care about
      # strict fairness.  This test creates two groups of threads:
      # group1 whose members all start waiting before any thread in
      # group2.  Enough connections are checked in to wakeup all
      # group1 threads, and the fact that only group1 and no group2
      # threads acquired a connection is enforced.
      def test_checkout_fairness_by_group
        @pool.instance_variable_set(:@size, 10)
        # take all the connections
        conns = (1..10).map { @pool.checkout }
        mutex = Mutex.new
        successes = []    # threads that successfully got a connection
        errors = []

        make_thread = proc do |i|
          t = Thread.new {
            begin
              @pool.checkout # never checked back in
              mutex.synchronize { successes << i }
            rescue => e
              mutex.synchronize { errors << e }
            end
          }
          Thread.pass until @pool.num_waiting_in_queue == i
          t
        end

        # all group1 threads start waiting before any in group2
        group1 = (1..5).map(&make_thread)
        group2 = (6..10).map(&make_thread)

        # checkin n connections back to the pool
        checkin = proc do |n|
          n.times do
            c = conns.pop
            @pool.checkin(c)
          end
        end

        checkin.call(group1.size)         # should wake up all group1

        loop do
          sleep 0.1
          break if mutex.synchronize { (successes.size + errors.size) == group1.size }
        end

        winners = mutex.synchronize { successes.dup }
        checkin.call(group2.size)         # should wake up everyone remaining

        group1.each(&:join)
        group2.each(&:join)

        assert_equal((1..group1.size).to_a, winners.sort)

        if errors.any?
          raise errors.first
        end
      end

      def test_automatic_reconnect_restores_after_disconnect
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        assert pool.automatic_reconnect
        assert pool.connection

        pool.disconnect!
        assert pool.connection
      end

      def test_automatic_reconnect_can_be_disabled
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        pool.disconnect!
        pool.automatic_reconnect = false

        assert_raises(ConnectionNotEstablished) do
          pool.connection
        end

        assert_raises(ConnectionNotEstablished) do
          pool.with_connection
        end
      end

      def test_pool_sets_connection_visitor
        assert @pool.connection.visitor.is_a?(Arel::Visitors::ToSql)
      end

      # make sure exceptions are thrown when establish_connection
      # is called with an anonymous class
      def test_anonymous_class_exception
        anonymous = Class.new(ActiveRecord::Base)

        assert_raises(RuntimeError) do
          anonymous.establish_connection
        end
      end

      class ConnectionTestModel < ActiveRecord::Base
      end

      def test_connection_notification_is_called
        payloads = []
        subscription = ActiveSupport::Notifications.subscribe("!connection.active_record") do |name, started, finished, unique_id, payload|
          payloads << payload
        end
        ConnectionTestModel.establish_connection :arunit

        assert_equal [:config, :connection_id, :spec_name], payloads[0].keys.sort
        assert_equal "ActiveRecord::ConnectionAdapters::ConnectionPoolTest::ConnectionTestModel", payloads[0][:spec_name]
      ensure
        ActiveSupport::Notifications.unsubscribe(subscription) if subscription
      end

      def test_pool_sets_connection_schema_cache
        connection = pool.checkout
        schema_cache = SchemaCache.new connection
        schema_cache.add(:posts)
        pool.schema_cache = schema_cache

        pool.with_connection do |conn|
          assert_equal pool.schema_cache.size, conn.schema_cache.size
          assert_same pool.schema_cache.columns(:posts), conn.schema_cache.columns(:posts)
        end

        pool.checkin connection
      end

      def test_concurrent_connection_establishment
        assert_operator @pool.connections.size, :<=, 1

        all_threads_in_new_connection = Concurrent::CountDownLatch.new(@pool.size - @pool.connections.size)
        all_go                        = Concurrent::CountDownLatch.new

        @pool.singleton_class.class_eval do
          define_method(:new_connection) do
            all_threads_in_new_connection.count_down
            all_go.wait
            super()
          end
        end

        connecting_threads = []
        @pool.size.times do
          connecting_threads << Thread.new { @pool.checkout }
        end

        begin
          Timeout.timeout(5) do
            # the kernel of the whole test is here, everything else is just scaffolding,
            # this latch will not be released unless conn. pool allows for concurrent
            # connection creation
            all_threads_in_new_connection.wait
          end
        rescue Timeout::Error
          flunk "pool unable to establish connections concurrently or implementation has " \
                "changed, this test then needs to patch a different :new_connection method"
        ensure
          # clean up the threads
          all_go.count_down
          connecting_threads.map(&:join)
        end
      end

      def test_non_bang_disconnect_and_clear_reloadable_connections_throw_exception_if_threads_dont_return_their_conns
        Thread.report_on_exception, original_report_on_exception = false, Thread.report_on_exception
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        [:disconnect, :clear_reloadable_connections].each do |group_action_method|
          @pool.with_connection do |connection|
            assert_raises(ExclusiveConnectionTimeoutError) do
              Thread.new { @pool.send(group_action_method) }.join
            end
          end
        end
      ensure
        Thread.report_on_exception = original_report_on_exception
      end

      def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns
        [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
          thread = timed_join_result = nil
          @pool.with_connection do |connection|
            thread = Thread.new { @pool.send(group_action_method) }

            # give the other `thread` some time to get stuck in `group_action_method`
            timed_join_result = thread.join(0.3)
            # thread.join # => `nil` means the other thread hasn't finished running and is still waiting for us to
            # release our connection
            assert_nil timed_join_result

            # assert that since this is within default timeout our connection hasn't been forcefully taken away from us
            assert_predicate @pool, :active_connection?
          end
        ensure
          thread.join if thread && !timed_join_result # clean up the other thread
        end
      end

      def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_acquire_all_connections_proceed_anyway
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        [:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
          @pool.with_connection do |connection|
            Thread.new { @pool.send(group_action_method) }.join
            # assert connection has been forcefully taken away from us
            assert_not_predicate @pool, :active_connection?

            # make a new connection for with_connection to clean up
            @pool.connection
          end
        end
      end

      def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads
        with_single_connection_pool do |pool|
          [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
            conn               = pool.connection # drain the only available connection
            second_thread_done = Concurrent::Event.new

            begin
              # create a first_thread and let it get into the FIFO queue first
              first_thread = Thread.new do
                pool.with_connection { second_thread_done.wait }
              end

              # wait for first_thread to get in queue
              Thread.pass until pool.num_waiting_in_queue == 1

              # create a different, later thread, that will attempt to do a "group action",
              # but because of the group action semantics it should be able to preempt the
              # first_thread when a connection is made available
              second_thread = Thread.new do
                pool.send(group_action_method)
                second_thread_done.set
              end

              # wait for second_thread to get in queue
              Thread.pass until pool.num_waiting_in_queue == 2

              # return the only available connection
              pool.checkin(conn)

              # if the second_thread is not able to preempt the first_thread,
              # they will temporarily (until either of them timeouts with ConnectionTimeoutError)
              # deadlock and a join(2) timeout will be reached
              assert second_thread.join(2), "#{group_action_method} is not able to preempt other waiting threads"

            ensure
              # post test clean up
              failed = !second_thread_done.set?

              if failed
                second_thread_done.set

                first_thread.join(2)
                second_thread.join(2)
              end

              first_thread.join(10) || raise("first_thread got stuck")
              second_thread.join(10) || raise("second_thread got stuck")
            end
          end
        end
      end

      def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary
        with_single_connection_pool do |pool|
          conn = pool.connection # drain the only available connection
          def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections
            true
          end

          stuck_thread = Thread.new do
            pool.with_connection { }
          end

          # wait for stuck_thread to get in queue
          Thread.pass until pool.num_waiting_in_queue == 1

          pool.clear_reloadable_connections

          unless stuck_thread.join(2)
            flunk "clear_reloadable_connections must not let other connection waiting threads get stuck in queue"
          end

          assert_equal 0, pool.num_waiting_in_queue
        end
      end

      def test_connection_pool_stat
        with_single_connection_pool do |pool|
          pool.with_connection do |connection|
            stats = pool.stat
            assert_equal({ size: 1, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }, stats)
          end

          stats = pool.stat
          assert_equal({ size: 1, connections: 1, busy: 0, dead: 0, idle: 1, waiting: 0, checkout_timeout: 5 }, stats)

          Thread.new do
            pool.checkout
            Thread.current.kill
          end.join

          stats = pool.stat
          assert_equal({ size: 1, connections: 1, busy: 0, dead: 1, idle: 0, waiting: 0, checkout_timeout: 5 }, stats)
        end
      end

      def test_public_connections_access_threadsafe
        _conn1 = @pool.checkout
        conn2 = @pool.checkout

        connections = @pool.connections
        found_conn = nil

        # Without assuming too much about implementation
        # details make sure that a concurrent change to
        # the pool is thread-safe.
        connections.each_index do |idx|
          if connections[idx] == conn2
            Thread.new do
              @pool.remove(conn2)
            end.join
          end
          found_conn = connections[idx]
        end

        assert_not_nil found_conn
      end

      private
        def with_single_connection_pool
          one_conn_spec = ActiveRecord::Base.connection_pool.spec.dup
          one_conn_spec.config[:pool] = 1 # this is safe to do, because .dupped ConnectionSpecification also auto-dups its config
          yield(pool = ConnectionPool.new(one_conn_spec))
        ensure
          pool.disconnect! if pool
        end
    end
  end
end