diff options
author | Matthew Draper <matthew@trebex.net> | 2016-02-02 06:31:50 +1030 |
---|---|---|
committer | Matthew Draper <matthew@trebex.net> | 2016-02-02 06:31:50 +1030 |
commit | f8167acc41d514847e07a305262481db6a149dd6 (patch) | |
tree | b5e3c9235c7404b18c3f1b5aa7b1802140a65d50 /activesupport | |
parent | f3e6e80e13103cbf210c95b5f601bc448a0aecaa (diff) | |
parent | f836630f8cdf53a06259cc22ac842bbfa6376f65 (diff) | |
download | rails-f8167acc41d514847e07a305262481db6a149dd6.tar.gz rails-f8167acc41d514847e07a305262481db6a149dd6.tar.bz2 rails-f8167acc41d514847e07a305262481db6a149dd6.zip |
Merge pull request #23398 from matthewd/interlock
Address remaining known issues in Interlock
Diffstat (limited to 'activesupport')
-rw-r--r-- | activesupport/lib/active_support/concurrency/share_lock.rb | 56 | ||||
-rw-r--r-- | activesupport/lib/active_support/dependencies/interlock.rb | 6 | ||||
-rw-r--r-- | activesupport/test/share_lock_test.rb | 78 |
3 files changed, 118 insertions, 22 deletions
diff --git a/activesupport/lib/active_support/concurrency/share_lock.rb b/activesupport/lib/active_support/concurrency/share_lock.rb index ca48164c54..8e4ca272ba 100644 --- a/activesupport/lib/active_support/concurrency/share_lock.rb +++ b/activesupport/lib/active_support/concurrency/share_lock.rb @@ -48,17 +48,11 @@ module ActiveSupport def start_exclusive(purpose: nil, compatible: [], no_wait: false) synchronize do unless @exclusive_thread == Thread.current - if busy?(purpose) + if busy_for_exclusive?(purpose) return false if no_wait - loose_shares = @sharing.delete(Thread.current) - @waiting[Thread.current] = compatible if loose_shares - - begin - @cv.wait_while { busy?(purpose) } - ensure - @waiting.delete Thread.current - @sharing[Thread.current] = loose_shares if loose_shares + yield_shares(purpose, compatible) do + @cv.wait_while { busy_for_exclusive?(purpose) } end end @exclusive_thread = Thread.current @@ -71,22 +65,26 @@ module ActiveSupport # Relinquish the exclusive lock. Must only be called by the thread # that called start_exclusive (and currently holds the lock). - def stop_exclusive + def stop_exclusive(compatible: []) synchronize do raise "invalid unlock" if @exclusive_thread != Thread.current @exclusive_depth -= 1 if @exclusive_depth == 0 @exclusive_thread = nil - @cv.broadcast + + yield_shares(nil, compatible) do + @cv.broadcast + @cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) } + end end end end - def start_sharing + def start_sharing(purpose: :share) synchronize do - if @exclusive_thread && @exclusive_thread != Thread.current - @cv.wait_while { @exclusive_thread } + if @sharing[Thread.current] == 0 && @exclusive_thread != Thread.current && busy_for_sharing?(purpose) + @cv.wait_while { busy_for_sharing?(purpose) } end @sharing[Thread.current] += 1 end @@ -109,12 +107,12 @@ module ActiveSupport # the block. # # See +start_exclusive+ for other options. - def exclusive(purpose: nil, compatible: [], no_wait: false) + def exclusive(purpose: nil, compatible: [], after_compatible: [], no_wait: false) if start_exclusive(purpose: purpose, compatible: compatible, no_wait: no_wait) begin yield ensure - stop_exclusive + stop_exclusive(compatible: after_compatible) end end end @@ -132,11 +130,31 @@ module ActiveSupport private # Must be called within synchronize - def busy?(purpose) - (@exclusive_thread && @exclusive_thread != Thread.current) || - @waiting.any? { |k, v| k != Thread.current && !v.include?(purpose) } || + def busy_for_exclusive?(purpose) + busy_for_sharing?(purpose) || @sharing.size > (@sharing[Thread.current] > 0 ? 1 : 0) end + + def busy_for_sharing?(purpose) + (@exclusive_thread && @exclusive_thread != Thread.current) || + @waiting.any? { |t, (_, c)| t != Thread.current && !c.include?(purpose) } + end + + def eligible_waiters?(compatible) + @waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } } + end + + def yield_shares(purpose, compatible) + loose_shares = @sharing.delete(Thread.current) + @waiting[Thread.current] = [purpose, compatible] if loose_shares + + begin + yield + ensure + @waiting.delete Thread.current + @sharing[Thread.current] = loose_shares if loose_shares + end + end end end end diff --git a/activesupport/lib/active_support/dependencies/interlock.rb b/activesupport/lib/active_support/dependencies/interlock.rb index fbeb904684..b6a1b25eee 100644 --- a/activesupport/lib/active_support/dependencies/interlock.rb +++ b/activesupport/lib/active_support/dependencies/interlock.rb @@ -8,13 +8,13 @@ module ActiveSupport #:nodoc: end def loading - @lock.exclusive(purpose: :load, compatible: [:load]) do + @lock.exclusive(purpose: :load, compatible: [:load], after_compatible: [:load]) do yield end end def unloading - @lock.exclusive(purpose: :unload, compatible: [:load, :unload]) do + @lock.exclusive(purpose: :unload, compatible: [:load, :unload], after_compatible: [:load, :unload]) do yield end end @@ -24,7 +24,7 @@ module ActiveSupport #:nodoc: # concurrent activity, return immediately (without executing the # block) instead of waiting. def attempt_unloading - @lock.exclusive(purpose: :unload, compatible: [:load, :unload], no_wait: true) do + @lock.exclusive(purpose: :unload, compatible: [:load, :unload], after_compatible: [:load, :unload], no_wait: true) do yield end end diff --git a/activesupport/test/share_lock_test.rb b/activesupport/test/share_lock_test.rb index 465a657308..12953d99a6 100644 --- a/activesupport/test/share_lock_test.rb +++ b/activesupport/test/share_lock_test.rb @@ -114,14 +114,17 @@ class ShareLockTest < ActiveSupport::TestCase [true, false].each do |use_upgrading| with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch| begin + together = Concurrent::CyclicBarrier.new(2) conflicting_exclusive_threads = [ Thread.new do @lock.send(use_upgrading ? :sharing : :tap) do + together.wait @lock.exclusive(purpose: :red, compatible: [:green, :purple]) {} end end, Thread.new do @lock.send(use_upgrading ? :sharing : :tap) do + together.wait @lock.exclusive(purpose: :blue, compatible: [:green]) {} end end @@ -183,11 +186,14 @@ class ShareLockTest < ActiveSupport::TestCase load_params = [:load, [:load]] unload_params = [:unload, [:unload, :load]] + all_sharing = Concurrent::CyclicBarrier.new(4) + [load_params, load_params, unload_params, unload_params].permutation do |thread_params| with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch| threads = thread_params.map do |purpose, compatible| Thread.new do @lock.sharing do + all_sharing.wait @lock.exclusive(purpose: purpose, compatible: compatible) do scratch_pad_mutex.synchronize { scratch_pad << purpose } end @@ -209,6 +215,78 @@ class ShareLockTest < ActiveSupport::TestCase end end + def test_new_share_attempts_block_on_waiting_exclusive + with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch| + release_exclusive = Concurrent::CountDownLatch.new + + waiting_exclusive = Thread.new do + @lock.sharing do + @lock.exclusive do + release_exclusive.wait + end + end + end + assert_threads_stuck waiting_exclusive + + late_share_attempt = Thread.new do + @lock.sharing {} + end + assert_threads_stuck late_share_attempt + + sharing_thread_release_latch.count_down + assert_threads_stuck late_share_attempt + + release_exclusive.count_down + assert_threads_not_stuck late_share_attempt + end + end + + def test_share_remains_reentrant_ignoring_a_waiting_exclusive + with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch| + ready = Concurrent::CyclicBarrier.new(2) + attempt_reentrancy = Concurrent::CountDownLatch.new + + sharer = Thread.new do + @lock.sharing do + ready.wait + attempt_reentrancy.wait + @lock.sharing {} + end + end + + exclusive = Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive {} + end + end + + assert_threads_stuck exclusive + + attempt_reentrancy.count_down + + assert_threads_not_stuck sharer + assert_threads_stuck exclusive + end + end + + def test_compatible_exclusives_cooperate_to_both_proceed + ready = Concurrent::CyclicBarrier.new(2) + done = Concurrent::CyclicBarrier.new(2) + + threads = 2.times.map do + Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x, compatible: [:x], after_compatible: [:x]) {} + done.wait + end + end + end + + assert_threads_not_stuck threads + end + def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads scratch_pad = [] scratch_pad_mutex = Mutex.new |