aboutsummaryrefslogblamecommitdiffstats
path: root/activesupport/test/share_lock_test.rb
blob: 42fd5eefc1c0504d426a640d632d841f3710a6ae (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
                             
 


                                               





                                                     







                                            

                                                                           
                                                               






































































                                                                                                    




















                                                                                   



                                                                                     
                                                     


                                                            
                             
                                                                                



                                                            
                             
                                                                        




                                                                                                                                  
 


                                                                        


                                           
                                                
 



                                                                   
                                           
                                                               
             




                                                            
 

















                                                                                                      

                                                    
                                





                             

                                 
 

                                               
 

                                                  




                                                                                           
                              

                                                                          


                 
           
 

                                                                  
 
                                               
 








                                                                    

























                                                                                   




























                                                                                   
















                                                                                   

























                                                 



























                                                 


































                                                         
                                                                
                                            
                                                          
                                                          
 





                                       
 







                                                       



               
       
 
                                            
                                    


                                                                       














































                                                              














                                                                                                
               

             










                                                                  




         
 

                              
 
             
 




                                                                        
 



                                                                            
 


                                                                        
       
 

                                                        
 



                                               
 



                         
 


                                                                    
 



                                             
         
 




                                             
         
 




                                                                      
         
 




                                                                             

         
 
                            
 


                                                         
 




                                   
         
 
                     
 




                                  
   
# frozen_string_literal: true

require "abstract_unit"
require "concurrent/atomic/count_down_latch"
require "active_support/concurrency/share_lock"

class ShareLockTest < ActiveSupport::TestCase
  def setup
    @lock = ActiveSupport::Concurrency::ShareLock.new
  end

  def test_reentrancy
    thread = Thread.new do
      @lock.sharing   { @lock.sharing   {} }
      @lock.exclusive { @lock.exclusive {} }
    end
    assert_threads_not_stuck thread
  end

  def test_sharing_doesnt_block
    with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_latch|
      assert_threads_not_stuck(Thread.new { @lock.sharing {} })
    end
  end

  def test_sharing_blocks_exclusive
    with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
      @lock.exclusive(no_wait: true) { flunk } # polling should fail
      exclusive_thread = Thread.new { @lock.exclusive {} }
      assert_threads_stuck_but_releasable_by_latch exclusive_thread, sharing_thread_release_latch
    end
  end

  def test_exclusive_blocks_sharing
    with_thread_waiting_in_lock_section(:exclusive) do |exclusive_thread_release_latch|
      sharing_thread = Thread.new { @lock.sharing {} }
      assert_threads_stuck_but_releasable_by_latch sharing_thread, exclusive_thread_release_latch
    end
  end

  def test_multiple_exlusives_are_able_to_progress
    with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
      exclusive_threads = (1..2).map do
        Thread.new do
          @lock.exclusive {}
        end
      end

      assert_threads_stuck_but_releasable_by_latch exclusive_threads, sharing_thread_release_latch
    end
  end

  def test_sharing_is_upgradeable_to_exclusive
    upgrading_thread = Thread.new do
      @lock.sharing do
        @lock.exclusive {}
      end
    end
    assert_threads_not_stuck upgrading_thread
  end

  def test_exclusive_upgrade_waits_for_other_sharers_to_leave
    with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
      in_sharing = Concurrent::CountDownLatch.new

      upgrading_thread = Thread.new do
        @lock.sharing do
          in_sharing.count_down
          @lock.exclusive {}
        end
      end

      in_sharing.wait
      assert_threads_stuck_but_releasable_by_latch upgrading_thread, sharing_thread_release_latch
    end
  end

  def test_exclusive_matching_purpose
    [true, false].each do |use_upgrading|
      with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
        exclusive_threads = (1..2).map do
          Thread.new do
            @lock.send(use_upgrading ? :sharing : :tap) do
              @lock.exclusive(purpose: :load, compatible: [:load, :unload]) {}
            end
          end
        end

        assert_threads_stuck_but_releasable_by_latch exclusive_threads, sharing_thread_release_latch
      end
    end
  end

  def test_killed_thread_loses_lock
    with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
      thread = Thread.new do
        @lock.sharing do
          @lock.exclusive {}
        end
      end

      assert_threads_stuck thread
      thread.kill

      sharing_thread_release_latch.count_down

      thread = Thread.new do
        @lock.exclusive {}
      end

      assert_threads_not_stuck thread
    end
  end

  def test_exclusive_conflicting_purpose
    [true, false].each do |use_upgrading|
      with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
        begin
          together = Concurrent::CyclicBarrier.new(2)
          conflicting_exclusive_threads = [
            Thread.new do
              @lock.send(use_upgrading ? :sharing : :tap) do
                together.wait
                @lock.exclusive(purpose: :red, compatible: [:green, :purple]) {}
              end
            end,
            Thread.new do
              @lock.send(use_upgrading ? :sharing : :tap) do
                together.wait
                @lock.exclusive(purpose: :blue, compatible: [:green]) {}
              end
            end
          ]

          assert_threads_stuck conflicting_exclusive_threads # wait for threads to get into their respective `exclusive {}` blocks

          # This thread will be stuck as long as any other thread is in
          # a sharing block. While it's blocked, it holds no lock, so it
          # doesn't interfere with any other attempts.
          no_purpose_thread = Thread.new do
            @lock.exclusive {}
          end
          assert_threads_stuck no_purpose_thread

          # This thread is compatible with both of the "primary"
          # attempts above. It's initially stuck on the outer share
          # lock, but as soon as that's released, it can run --
          # regardless of whether those threads hold share locks.
          compatible_thread = Thread.new do
            @lock.exclusive(purpose: :green, compatible: []) {}
          end
          assert_threads_stuck compatible_thread

          assert_threads_stuck conflicting_exclusive_threads

          sharing_thread_release_latch.count_down

          assert_threads_not_stuck compatible_thread # compatible thread is now able to squeak through

          if use_upgrading
            # The "primary" threads both each hold a share lock, and are
            # mutually incompatible; they're still stuck.
            assert_threads_stuck conflicting_exclusive_threads

            # The thread without a specified purpose is also stuck; it's
            # not compatible with anything.
            assert_threads_stuck no_purpose_thread
          else
            # As the primaries didn't hold a share lock, as soon as the
            # outer one was released, all the exclusive locks are free
            # to be acquired in turn.

            assert_threads_not_stuck conflicting_exclusive_threads
            assert_threads_not_stuck no_purpose_thread
          end
        ensure
          conflicting_exclusive_threads.each(&:kill)
          no_purpose_thread.kill
        end
      end
    end
  end

  def test_exclusive_ordering
    scratch_pad       = []
    scratch_pad_mutex = Mutex.new

    load_params   = [:load,   [:load]]
    unload_params = [:unload, [:unload, :load]]

    all_sharing = Concurrent::CyclicBarrier.new(4)

    [load_params, load_params, unload_params, unload_params].permutation do |thread_params|
      with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
        threads = thread_params.map do |purpose, compatible|
          Thread.new do
            @lock.sharing do
              all_sharing.wait
              @lock.exclusive(purpose: purpose, compatible: compatible) do
                scratch_pad_mutex.synchronize { scratch_pad << purpose }
              end
            end
          end
        end

        sleep(0.01)
        scratch_pad_mutex.synchronize { assert_empty scratch_pad }

        sharing_thread_release_latch.count_down

        assert_threads_not_stuck threads
        scratch_pad_mutex.synchronize do
          assert_equal [:load, :load, :unload, :unload], scratch_pad
          scratch_pad.clear
        end
      end
    end
  end

  def test_new_share_attempts_block_on_waiting_exclusive
    with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
      release_exclusive = Concurrent::CountDownLatch.new

      waiting_exclusive = Thread.new do
        @lock.sharing do
          @lock.exclusive do
            release_exclusive.wait
          end
        end
      end
      assert_threads_stuck waiting_exclusive

      late_share_attempt = Thread.new do
        @lock.sharing {}
      end
      assert_threads_stuck late_share_attempt

      sharing_thread_release_latch.count_down
      assert_threads_stuck late_share_attempt

      release_exclusive.count_down
      assert_threads_not_stuck late_share_attempt
    end
  end

  def test_share_remains_reentrant_ignoring_a_waiting_exclusive
    with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
      ready = Concurrent::CyclicBarrier.new(2)
      attempt_reentrancy = Concurrent::CountDownLatch.new

      sharer = Thread.new do
        @lock.sharing do
          ready.wait
          attempt_reentrancy.wait
          @lock.sharing {}
        end
      end

      exclusive = Thread.new do
        @lock.sharing do
          ready.wait
          @lock.exclusive {}
        end
      end

      assert_threads_stuck exclusive

      attempt_reentrancy.count_down

      assert_threads_not_stuck sharer
      assert_threads_stuck exclusive
    end
  end

  def test_compatible_exclusives_cooperate_to_both_proceed
    ready = Concurrent::CyclicBarrier.new(2)
    done = Concurrent::CyclicBarrier.new(2)

    threads = 2.times.map do
      Thread.new do
        @lock.sharing do
          ready.wait
          @lock.exclusive(purpose: :x, compatible: [:x], after_compatible: [:x]) {}
          done.wait
        end
      end
    end

    assert_threads_not_stuck threads
  end

  def test_manual_yield
    ready = Concurrent::CyclicBarrier.new(2)
    done = Concurrent::CyclicBarrier.new(2)

    threads = [
      Thread.new do
        @lock.sharing do
          ready.wait
          @lock.exclusive(purpose: :x) {}
          done.wait
        end
      end,

      Thread.new do
        @lock.sharing do
          ready.wait
          @lock.yield_shares(compatible: [:x]) do
            done.wait
          end
        end
      end,
    ]

    assert_threads_not_stuck threads
  end

  def test_manual_incompatible_yield
    ready = Concurrent::CyclicBarrier.new(2)
    done = Concurrent::CyclicBarrier.new(2)

    threads = [
      Thread.new do
        @lock.sharing do
          ready.wait
          @lock.exclusive(purpose: :x) {}
          done.wait
        end
      end,

      Thread.new do
        @lock.sharing do
          ready.wait
          @lock.yield_shares(compatible: [:y]) do
            done.wait
          end
        end
      end,
    ]

    assert_threads_stuck threads
  ensure
    threads.each(&:kill) if threads
  end

  def test_manual_recursive_yield
    ready = Concurrent::CyclicBarrier.new(2)
    done = Concurrent::CyclicBarrier.new(2)
    do_nesting = Concurrent::CountDownLatch.new

    threads = [
      Thread.new do
        @lock.sharing do
          ready.wait
          @lock.exclusive(purpose: :x) {}
          done.wait
        end
      end,

      Thread.new do
        @lock.sharing do
          @lock.yield_shares(compatible: [:x]) do
            @lock.sharing do
              ready.wait
              do_nesting.wait
              @lock.yield_shares(compatible: [:x, :y]) do
                done.wait
              end
            end
          end
        end
      end
    ]

    assert_threads_stuck threads
    do_nesting.count_down

    assert_threads_not_stuck threads
  end

  def test_manual_recursive_yield_cannot_expand_outer_compatible
    ready = Concurrent::CyclicBarrier.new(2)
    do_compatible_nesting = Concurrent::CountDownLatch.new
    in_compatible_nesting = Concurrent::CountDownLatch.new

    incompatible_thread = Thread.new do
      @lock.sharing do
        ready.wait
        @lock.exclusive(purpose: :x) {}
      end
    end

    yield_shares_thread = Thread.new do
      @lock.sharing do
        ready.wait
        @lock.yield_shares(compatible: [:y]) do
          do_compatible_nesting.wait
          @lock.sharing do
            @lock.yield_shares(compatible: [:x, :y]) do
              in_compatible_nesting.wait
            end
          end
        end
      end
    end

    assert_threads_stuck incompatible_thread
    do_compatible_nesting.count_down
    assert_threads_stuck incompatible_thread
    in_compatible_nesting.count_down
    assert_threads_not_stuck [yield_shares_thread, incompatible_thread]
  end

  def test_manual_recursive_yield_restores_previous_compatible
    ready = Concurrent::CyclicBarrier.new(2)
    do_nesting = Concurrent::CountDownLatch.new
    after_nesting = Concurrent::CountDownLatch.new

    incompatible_thread = Thread.new do
      ready.wait
      @lock.exclusive(purpose: :z) {}
    end

    recursive_yield_shares_thread = Thread.new do
      @lock.sharing do
        ready.wait
        @lock.yield_shares(compatible: [:y]) do
          do_nesting.wait
          @lock.sharing do
            @lock.yield_shares(compatible: [:x, :y]) {}
          end
          after_nesting.wait
        end
      end
    end

    assert_threads_stuck incompatible_thread
    do_nesting.count_down
    assert_threads_stuck incompatible_thread

    compatible_thread = Thread.new do
      @lock.exclusive(purpose: :y) {}
    end
    assert_threads_not_stuck compatible_thread

    post_nesting_incompatible_thread = Thread.new do
      @lock.exclusive(purpose: :x) {}
    end
    assert_threads_stuck post_nesting_incompatible_thread

    after_nesting.count_down
    assert_threads_not_stuck recursive_yield_shares_thread
    # post_nesting_incompatible_thread can now proceed
    assert_threads_not_stuck post_nesting_incompatible_thread
    # assert_threads_not_stuck can now proceed
    assert_threads_not_stuck incompatible_thread
  end

  def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads
    scratch_pad       = []
    scratch_pad_mutex = Mutex.new

    upgrading_load_params       = [:load,   [:load],          true]
    non_upgrading_unload_params = [:unload, [:load, :unload], false]

    [upgrading_load_params, non_upgrading_unload_params].permutation do |thread_params|
      with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
        threads = thread_params.map do |purpose, compatible, use_upgrading|
          Thread.new do
            @lock.send(use_upgrading ? :sharing : :tap) do
              @lock.exclusive(purpose: purpose, compatible: compatible) do
                scratch_pad_mutex.synchronize { scratch_pad << purpose }
              end
            end
          end
        end

        assert_threads_stuck threads
        scratch_pad_mutex.synchronize { assert_empty scratch_pad }

        sharing_thread_release_latch.count_down

        assert_threads_not_stuck threads
        scratch_pad_mutex.synchronize do
          assert_equal [:load, :unload], scratch_pad
          scratch_pad.clear
        end
      end
    end
  end

  private

    module CustomAssertions
      SUFFICIENT_TIMEOUT = 0.2

      private

        def assert_threads_stuck_but_releasable_by_latch(threads, latch)
          assert_threads_stuck threads
          latch.count_down
          assert_threads_not_stuck threads
        end

        def assert_threads_stuck(threads)
          sleep(SUFFICIENT_TIMEOUT) # give threads time to do their business
          assert(Array(threads).all? { |t| t.join(0.001).nil? })
        end

        def assert_threads_not_stuck(threads)
          assert(Array(threads).all? { |t| t.join(SUFFICIENT_TIMEOUT) })
        end
    end

    class CustomAssertionsTest < ActiveSupport::TestCase
      include CustomAssertions

      def setup
        @latch = Concurrent::CountDownLatch.new
        @thread = Thread.new { @latch.wait }
      end

      def teardown
        @latch.count_down
        @thread.join
      end

      def test_happy_path
        assert_threads_stuck_but_releasable_by_latch @thread, @latch
      end

      def test_detects_stuck_thread
        assert_raises(Minitest::Assertion) do
          assert_threads_not_stuck @thread
        end
      end

      def test_detects_free_thread
        @latch.count_down
        assert_raises(Minitest::Assertion) do
          assert_threads_stuck @thread
        end
      end

      def test_detects_already_released
        @latch.count_down
        assert_raises(Minitest::Assertion) do
          assert_threads_stuck_but_releasable_by_latch @thread, @latch
        end
      end

      def test_detects_remains_latched
        another_latch = Concurrent::CountDownLatch.new
        assert_raises(Minitest::Assertion) do
          assert_threads_stuck_but_releasable_by_latch @thread, another_latch
        end
      end
    end

    include CustomAssertions

    def with_thread_waiting_in_lock_section(lock_section)
      in_section      = Concurrent::CountDownLatch.new
      section_release = Concurrent::CountDownLatch.new

      stuck_thread = Thread.new do
        @lock.send(lock_section) do
          in_section.count_down
          section_release.wait
        end
      end

      in_section.wait

      yield section_release
    ensure
      section_release.count_down
      stuck_thread.join # clean up
    end
end