diff options
Diffstat (limited to 'activesupport')
4 files changed, 226 insertions, 27 deletions
diff --git a/activesupport/lib/active_support/concurrency/share_lock.rb b/activesupport/lib/active_support/concurrency/share_lock.rb index 8e4ca272ba..54244317e4 100644 --- a/activesupport/lib/active_support/concurrency/share_lock.rb +++ b/activesupport/lib/active_support/concurrency/share_lock.rb @@ -6,12 +6,6 @@ module ActiveSupport # A share/exclusive lock, otherwise known as a read/write lock. # # https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock - #-- - # Note that a pending Exclusive lock attempt does not block incoming - # Share requests (i.e., we are "read-preferring"). That seems - # consistent with the behavior of "loose" upgrades, but may be the - # wrong choice otherwise: it nominally reduces the possibility of - # deadlock by risking starvation instead. class ShareLock include MonitorMixin @@ -51,7 +45,7 @@ module ActiveSupport if busy_for_exclusive?(purpose) return false if no_wait - yield_shares(purpose, compatible) do + yield_shares(purpose: purpose, compatible: compatible, block_share: true) do @cv.wait_while { busy_for_exclusive?(purpose) } end end @@ -73,18 +67,28 @@ module ActiveSupport if @exclusive_depth == 0 @exclusive_thread = nil - yield_shares(nil, compatible) do - @cv.broadcast - @cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) } + if eligible_waiters?(compatible) + yield_shares(compatible: compatible, block_share: true) do + @cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) } + end end + @cv.broadcast end end end - def start_sharing(purpose: :share) + def start_sharing synchronize do - if @sharing[Thread.current] == 0 && @exclusive_thread != Thread.current && busy_for_sharing?(purpose) - @cv.wait_while { busy_for_sharing?(purpose) } + if @sharing[Thread.current] > 0 || @exclusive_thread == Thread.current + # We already hold a lock; nothing to wait for + elsif @waiting[Thread.current] + # We're nested inside a +yield_shares+ call: we'll resume as + # soon as there isn't an exclusive lock in our way + @cv.wait_while { @exclusive_thread } + else + # This is an initial / outermost share call: any outstanding + # requests for an exclusive lock get to go first + @cv.wait_while { busy_for_sharing?(false) } end @sharing[Thread.current] += 1 end @@ -127,6 +131,40 @@ module ActiveSupport end end + # Temporarily give up all held Share locks while executing the + # supplied block, allowing any +compatible+ exclusive lock request + # to proceed. + def yield_shares(purpose: nil, compatible: [], block_share: false) + loose_shares = previous_wait = nil + synchronize do + if loose_shares = @sharing.delete(Thread.current) + if previous_wait = @waiting[Thread.current] + purpose = nil unless purpose == previous_wait[0] + compatible &= previous_wait[1] + end + compatible |= [false] unless block_share + @waiting[Thread.current] = [purpose, compatible] + + @cv.broadcast + end + end + + begin + yield + ensure + synchronize do + @cv.wait_while { @exclusive_thread && @exclusive_thread != Thread.current } + + if previous_wait + @waiting[Thread.current] = previous_wait + else + @waiting.delete Thread.current + end + @sharing[Thread.current] = loose_shares if loose_shares + end + end + end + private # Must be called within synchronize @@ -143,18 +181,6 @@ module ActiveSupport def eligible_waiters?(compatible) @waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } } end - - def yield_shares(purpose, compatible) - loose_shares = @sharing.delete(Thread.current) - @waiting[Thread.current] = [purpose, compatible] if loose_shares - - begin - yield - ensure - @waiting.delete Thread.current - @sharing[Thread.current] = loose_shares if loose_shares - end - end end end end diff --git a/activesupport/lib/active_support/dependencies/interlock.rb b/activesupport/lib/active_support/dependencies/interlock.rb index b6a1b25eee..47bcecbb35 100644 --- a/activesupport/lib/active_support/dependencies/interlock.rb +++ b/activesupport/lib/active_support/dependencies/interlock.rb @@ -42,6 +42,12 @@ module ActiveSupport #:nodoc: yield end end + + def permit_concurrent_loads + @lock.yield_shares(compatible: [:load]) do + yield + end + end end end end diff --git a/activesupport/lib/active_support/deprecation/proxy_wrappers.rb b/activesupport/lib/active_support/deprecation/proxy_wrappers.rb index 6f0ad445fc..0cb2d4d22e 100644 --- a/activesupport/lib/active_support/deprecation/proxy_wrappers.rb +++ b/activesupport/lib/active_support/deprecation/proxy_wrappers.rb @@ -80,7 +80,7 @@ module ActiveSupport # example.old_request.to_s # # => DEPRECATION WARNING: @request is deprecated! Call request.to_s instead of # @request.to_s - # (Bactrace information…) + # (Backtrace information…) # "special_request" # # example.request.to_s @@ -118,7 +118,7 @@ module ActiveSupport # # PLANETS.map { |planet| planet.capitalize } # # => DEPRECATION WARNING: PLANETS is deprecated! Use PLANETS_POST_2006 instead. - # (Bactrace information…) + # (Backtrace information…) # ["Mercury", "Venus", "Earth", "Mars", "Jupiter", "Saturn", "Uranus", "Neptune"] class DeprecatedConstantProxy < DeprecationProxy def initialize(old_const, new_const, deprecator = ActiveSupport::Deprecation.instance) diff --git a/activesupport/test/share_lock_test.rb b/activesupport/test/share_lock_test.rb index 12953d99a6..acefa185a8 100644 --- a/activesupport/test/share_lock_test.rb +++ b/activesupport/test/share_lock_test.rb @@ -287,6 +287,173 @@ class ShareLockTest < ActiveSupport::TestCase assert_threads_not_stuck threads end + def test_manual_yield + ready = Concurrent::CyclicBarrier.new(2) + done = Concurrent::CyclicBarrier.new(2) + + threads = [ + Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x) {} + done.wait + end + end, + + Thread.new do + @lock.sharing do + ready.wait + @lock.yield_shares(compatible: [:x]) do + done.wait + end + end + end, + ] + + assert_threads_not_stuck threads + end + + def test_manual_incompatible_yield + ready = Concurrent::CyclicBarrier.new(2) + done = Concurrent::CyclicBarrier.new(2) + + threads = [ + Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x) {} + done.wait + end + end, + + Thread.new do + @lock.sharing do + ready.wait + @lock.yield_shares(compatible: [:y]) do + done.wait + end + end + end, + ] + + assert_threads_stuck threads + ensure + threads.each(&:kill) if threads + end + + def test_manual_recursive_yield + ready = Concurrent::CyclicBarrier.new(2) + done = Concurrent::CyclicBarrier.new(2) + do_nesting = Concurrent::CountDownLatch.new + + threads = [ + Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x) {} + done.wait + end + end, + + Thread.new do + @lock.sharing do + @lock.yield_shares(compatible: [:x]) do + @lock.sharing do + ready.wait + do_nesting.wait + @lock.yield_shares(compatible: [:x, :y]) do + done.wait + end + end + end + end + end + ] + + assert_threads_stuck threads + do_nesting.count_down + + assert_threads_not_stuck threads + end + + def test_manual_recursive_yield_cannot_expand_outer_compatible + ready = Concurrent::CyclicBarrier.new(2) + do_compatible_nesting = Concurrent::CountDownLatch.new + in_compatible_nesting = Concurrent::CountDownLatch.new + + incompatible_thread = Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x) {} + end + end + + yield_shares_thread = Thread.new do + @lock.sharing do + ready.wait + @lock.yield_shares(compatible: [:y]) do + do_compatible_nesting.wait + @lock.sharing do + @lock.yield_shares(compatible: [:x, :y]) do + in_compatible_nesting.wait + end + end + end + end + end + + assert_threads_stuck incompatible_thread + do_compatible_nesting.count_down + assert_threads_stuck incompatible_thread + in_compatible_nesting.count_down + assert_threads_not_stuck [yield_shares_thread, incompatible_thread] + end + + def test_manual_recursive_yield_restores_previous_compatible + ready = Concurrent::CyclicBarrier.new(2) + do_nesting = Concurrent::CountDownLatch.new + after_nesting = Concurrent::CountDownLatch.new + + incompatible_thread = Thread.new do + ready.wait + @lock.exclusive(purpose: :z) {} + end + + recursive_yield_shares_thread = Thread.new do + @lock.sharing do + ready.wait + @lock.yield_shares(compatible: [:y]) do + do_nesting.wait + @lock.sharing do + @lock.yield_shares(compatible: [:x, :y]) {} + end + after_nesting.wait + end + end + end + + assert_threads_stuck incompatible_thread + do_nesting.count_down + assert_threads_stuck incompatible_thread + + compatible_thread = Thread.new do + @lock.exclusive(purpose: :y) {} + end + assert_threads_not_stuck compatible_thread + + post_nesting_incompatible_thread = Thread.new do + @lock.exclusive(purpose: :x) {} + end + assert_threads_stuck post_nesting_incompatible_thread + + after_nesting.count_down + assert_threads_not_stuck recursive_yield_shares_thread + # post_nesting_incompatible_thread can now proceed + assert_threads_not_stuck post_nesting_incompatible_thread + # assert_threads_not_stuck can now proceed + assert_threads_not_stuck incompatible_thread + end + def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads scratch_pad = [] scratch_pad_mutex = Mutex.new |