diff options
author | Matthew Draper <matthew@trebex.net> | 2016-02-08 06:42:49 +1030 |
---|---|---|
committer | Matthew Draper <matthew@trebex.net> | 2016-02-08 06:42:49 +1030 |
commit | 15f6ad3adca89d07c294fe78e2d383055634f6ca (patch) | |
tree | e7245fb5ab53ef16a23ae21ce84c79da983375a3 /activesupport | |
parent | 7e35cb2987e146de29d50f97a43caef61e7588af (diff) | |
parent | 11579b8306dd6303b306ee10c8bc9f7a4a6adea3 (diff) | |
download | rails-15f6ad3adca89d07c294fe78e2d383055634f6ca.tar.gz rails-15f6ad3adca89d07c294fe78e2d383055634f6ca.tar.bz2 rails-15f6ad3adca89d07c294fe78e2d383055634f6ca.zip |
Merge pull request #23532 from matthewd/live-interlock
Hand off the interlock to the new thread in AC::Live
Diffstat (limited to 'activesupport')
-rw-r--r-- | activesupport/lib/active_support/concurrency/share_lock.rb | 76 | ||||
-rw-r--r-- | activesupport/lib/active_support/dependencies/interlock.rb | 6 | ||||
-rw-r--r-- | activesupport/test/share_lock_test.rb | 167 |
3 files changed, 224 insertions, 25 deletions
diff --git a/activesupport/lib/active_support/concurrency/share_lock.rb b/activesupport/lib/active_support/concurrency/share_lock.rb index 8e4ca272ba..54244317e4 100644 --- a/activesupport/lib/active_support/concurrency/share_lock.rb +++ b/activesupport/lib/active_support/concurrency/share_lock.rb @@ -6,12 +6,6 @@ module ActiveSupport # A share/exclusive lock, otherwise known as a read/write lock. # # https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock - #-- - # Note that a pending Exclusive lock attempt does not block incoming - # Share requests (i.e., we are "read-preferring"). That seems - # consistent with the behavior of "loose" upgrades, but may be the - # wrong choice otherwise: it nominally reduces the possibility of - # deadlock by risking starvation instead. class ShareLock include MonitorMixin @@ -51,7 +45,7 @@ module ActiveSupport if busy_for_exclusive?(purpose) return false if no_wait - yield_shares(purpose, compatible) do + yield_shares(purpose: purpose, compatible: compatible, block_share: true) do @cv.wait_while { busy_for_exclusive?(purpose) } end end @@ -73,18 +67,28 @@ module ActiveSupport if @exclusive_depth == 0 @exclusive_thread = nil - yield_shares(nil, compatible) do - @cv.broadcast - @cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) } + if eligible_waiters?(compatible) + yield_shares(compatible: compatible, block_share: true) do + @cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) } + end end + @cv.broadcast end end end - def start_sharing(purpose: :share) + def start_sharing synchronize do - if @sharing[Thread.current] == 0 && @exclusive_thread != Thread.current && busy_for_sharing?(purpose) - @cv.wait_while { busy_for_sharing?(purpose) } + if @sharing[Thread.current] > 0 || @exclusive_thread == Thread.current + # We already hold a lock; nothing to wait for + elsif @waiting[Thread.current] + # We're nested inside a +yield_shares+ call: we'll resume as + # soon as there isn't an exclusive lock in our way + @cv.wait_while { @exclusive_thread } + else + # This is an initial / outermost share call: any outstanding + # requests for an exclusive lock get to go first + @cv.wait_while { busy_for_sharing?(false) } end @sharing[Thread.current] += 1 end @@ -127,6 +131,40 @@ module ActiveSupport end end + # Temporarily give up all held Share locks while executing the + # supplied block, allowing any +compatible+ exclusive lock request + # to proceed. + def yield_shares(purpose: nil, compatible: [], block_share: false) + loose_shares = previous_wait = nil + synchronize do + if loose_shares = @sharing.delete(Thread.current) + if previous_wait = @waiting[Thread.current] + purpose = nil unless purpose == previous_wait[0] + compatible &= previous_wait[1] + end + compatible |= [false] unless block_share + @waiting[Thread.current] = [purpose, compatible] + + @cv.broadcast + end + end + + begin + yield + ensure + synchronize do + @cv.wait_while { @exclusive_thread && @exclusive_thread != Thread.current } + + if previous_wait + @waiting[Thread.current] = previous_wait + else + @waiting.delete Thread.current + end + @sharing[Thread.current] = loose_shares if loose_shares + end + end + end + private # Must be called within synchronize @@ -143,18 +181,6 @@ module ActiveSupport def eligible_waiters?(compatible) @waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } } end - - def yield_shares(purpose, compatible) - loose_shares = @sharing.delete(Thread.current) - @waiting[Thread.current] = [purpose, compatible] if loose_shares - - begin - yield - ensure - @waiting.delete Thread.current - @sharing[Thread.current] = loose_shares if loose_shares - end - end end end end diff --git a/activesupport/lib/active_support/dependencies/interlock.rb b/activesupport/lib/active_support/dependencies/interlock.rb index b6a1b25eee..47bcecbb35 100644 --- a/activesupport/lib/active_support/dependencies/interlock.rb +++ b/activesupport/lib/active_support/dependencies/interlock.rb @@ -42,6 +42,12 @@ module ActiveSupport #:nodoc: yield end end + + def permit_concurrent_loads + @lock.yield_shares(compatible: [:load]) do + yield + end + end end end end diff --git a/activesupport/test/share_lock_test.rb b/activesupport/test/share_lock_test.rb index 12953d99a6..acefa185a8 100644 --- a/activesupport/test/share_lock_test.rb +++ b/activesupport/test/share_lock_test.rb @@ -287,6 +287,173 @@ class ShareLockTest < ActiveSupport::TestCase assert_threads_not_stuck threads end + def test_manual_yield + ready = Concurrent::CyclicBarrier.new(2) + done = Concurrent::CyclicBarrier.new(2) + + threads = [ + Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x) {} + done.wait + end + end, + + Thread.new do + @lock.sharing do + ready.wait + @lock.yield_shares(compatible: [:x]) do + done.wait + end + end + end, + ] + + assert_threads_not_stuck threads + end + + def test_manual_incompatible_yield + ready = Concurrent::CyclicBarrier.new(2) + done = Concurrent::CyclicBarrier.new(2) + + threads = [ + Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x) {} + done.wait + end + end, + + Thread.new do + @lock.sharing do + ready.wait + @lock.yield_shares(compatible: [:y]) do + done.wait + end + end + end, + ] + + assert_threads_stuck threads + ensure + threads.each(&:kill) if threads + end + + def test_manual_recursive_yield + ready = Concurrent::CyclicBarrier.new(2) + done = Concurrent::CyclicBarrier.new(2) + do_nesting = Concurrent::CountDownLatch.new + + threads = [ + Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x) {} + done.wait + end + end, + + Thread.new do + @lock.sharing do + @lock.yield_shares(compatible: [:x]) do + @lock.sharing do + ready.wait + do_nesting.wait + @lock.yield_shares(compatible: [:x, :y]) do + done.wait + end + end + end + end + end + ] + + assert_threads_stuck threads + do_nesting.count_down + + assert_threads_not_stuck threads + end + + def test_manual_recursive_yield_cannot_expand_outer_compatible + ready = Concurrent::CyclicBarrier.new(2) + do_compatible_nesting = Concurrent::CountDownLatch.new + in_compatible_nesting = Concurrent::CountDownLatch.new + + incompatible_thread = Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x) {} + end + end + + yield_shares_thread = Thread.new do + @lock.sharing do + ready.wait + @lock.yield_shares(compatible: [:y]) do + do_compatible_nesting.wait + @lock.sharing do + @lock.yield_shares(compatible: [:x, :y]) do + in_compatible_nesting.wait + end + end + end + end + end + + assert_threads_stuck incompatible_thread + do_compatible_nesting.count_down + assert_threads_stuck incompatible_thread + in_compatible_nesting.count_down + assert_threads_not_stuck [yield_shares_thread, incompatible_thread] + end + + def test_manual_recursive_yield_restores_previous_compatible + ready = Concurrent::CyclicBarrier.new(2) + do_nesting = Concurrent::CountDownLatch.new + after_nesting = Concurrent::CountDownLatch.new + + incompatible_thread = Thread.new do + ready.wait + @lock.exclusive(purpose: :z) {} + end + + recursive_yield_shares_thread = Thread.new do + @lock.sharing do + ready.wait + @lock.yield_shares(compatible: [:y]) do + do_nesting.wait + @lock.sharing do + @lock.yield_shares(compatible: [:x, :y]) {} + end + after_nesting.wait + end + end + end + + assert_threads_stuck incompatible_thread + do_nesting.count_down + assert_threads_stuck incompatible_thread + + compatible_thread = Thread.new do + @lock.exclusive(purpose: :y) {} + end + assert_threads_not_stuck compatible_thread + + post_nesting_incompatible_thread = Thread.new do + @lock.exclusive(purpose: :x) {} + end + assert_threads_stuck post_nesting_incompatible_thread + + after_nesting.count_down + assert_threads_not_stuck recursive_yield_shares_thread + # post_nesting_incompatible_thread can now proceed + assert_threads_not_stuck post_nesting_incompatible_thread + # assert_threads_not_stuck can now proceed + assert_threads_not_stuck incompatible_thread + end + def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads scratch_pad = [] scratch_pad_mutex = Mutex.new |