From 92203edbe6546f84921b6ccb6e79c3a01857a8b3 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 19:42:14 +1030 Subject: Always obtain the lock and do the unload We mostly care about `reload_classes_only_on_change=true`, because that's the default... and there, we definitely need to wait for the lock when necessary. --- railties/lib/rails/application/finisher.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/railties/lib/rails/application/finisher.rb b/railties/lib/rails/application/finisher.rb index 404e3c3e23..411cdbad19 100644 --- a/railties/lib/rails/application/finisher.rb +++ b/railties/lib/rails/application/finisher.rb @@ -86,7 +86,7 @@ module Rails # added in the hook are taken into account. initializer :set_clear_dependencies_hook, group: :all do callback = lambda do - ActiveSupport::Dependencies.interlock.attempt_unloading do + ActiveSupport::Dependencies.interlock.unloading do ActiveSupport::DescendantsTracker.clear ActiveSupport::Dependencies.clear end -- cgit v1.2.3 From aeb58ab70470b7f395a1e77b10c9b7a73955dad8 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 23:41:07 +1030 Subject: Block new share attempts if there's an exclusive waiter --- .../lib/active_support/concurrency/share_lock.rb | 20 ++++++++------ activesupport/test/share_lock_test.rb | 32 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/activesupport/lib/active_support/concurrency/share_lock.rb b/activesupport/lib/active_support/concurrency/share_lock.rb index ca48164c54..1537f2898f 100644 --- a/activesupport/lib/active_support/concurrency/share_lock.rb +++ b/activesupport/lib/active_support/concurrency/share_lock.rb @@ -48,14 +48,14 @@ module ActiveSupport def start_exclusive(purpose: nil, compatible: [], no_wait: false) synchronize do unless @exclusive_thread == Thread.current - if busy?(purpose) + if busy_for_exclusive?(purpose) return false if no_wait loose_shares = @sharing.delete(Thread.current) @waiting[Thread.current] = compatible if loose_shares begin - @cv.wait_while { busy?(purpose) } + @cv.wait_while { busy_for_exclusive?(purpose) } ensure @waiting.delete Thread.current @sharing[Thread.current] = loose_shares if loose_shares @@ -83,10 +83,10 @@ module ActiveSupport end end - def start_sharing + def start_sharing(purpose: :share) synchronize do - if @exclusive_thread && @exclusive_thread != Thread.current - @cv.wait_while { @exclusive_thread } + if busy_for_sharing?(purpose) + @cv.wait_while { busy_for_sharing?(purpose) } end @sharing[Thread.current] += 1 end @@ -132,11 +132,15 @@ module ActiveSupport private # Must be called within synchronize - def busy?(purpose) - (@exclusive_thread && @exclusive_thread != Thread.current) || - @waiting.any? { |k, v| k != Thread.current && !v.include?(purpose) } || + def busy_for_exclusive?(purpose) + busy_for_sharing?(purpose) || @sharing.size > (@sharing[Thread.current] > 0 ? 1 : 0) end + + def busy_for_sharing?(purpose) + (@exclusive_thread && @exclusive_thread != Thread.current) || + @waiting.any? { |k, v| k != Thread.current && !v.include?(purpose) } + end end end end diff --git a/activesupport/test/share_lock_test.rb b/activesupport/test/share_lock_test.rb index 465a657308..0a5b074bee 100644 --- a/activesupport/test/share_lock_test.rb +++ b/activesupport/test/share_lock_test.rb @@ -114,14 +114,17 @@ class ShareLockTest < ActiveSupport::TestCase [true, false].each do |use_upgrading| with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch| begin + together = Concurrent::CyclicBarrier.new(2) conflicting_exclusive_threads = [ Thread.new do @lock.send(use_upgrading ? :sharing : :tap) do + together.wait @lock.exclusive(purpose: :red, compatible: [:green, :purple]) {} end end, Thread.new do @lock.send(use_upgrading ? :sharing : :tap) do + together.wait @lock.exclusive(purpose: :blue, compatible: [:green]) {} end end @@ -183,11 +186,14 @@ class ShareLockTest < ActiveSupport::TestCase load_params = [:load, [:load]] unload_params = [:unload, [:unload, :load]] + all_sharing = Concurrent::CyclicBarrier.new(4) + [load_params, load_params, unload_params, unload_params].permutation do |thread_params| with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch| threads = thread_params.map do |purpose, compatible| Thread.new do @lock.sharing do + all_sharing.wait @lock.exclusive(purpose: purpose, compatible: compatible) do scratch_pad_mutex.synchronize { scratch_pad << purpose } end @@ -209,6 +215,32 @@ class ShareLockTest < ActiveSupport::TestCase end end + def test_new_share_attempts_block_on_waiting_exclusive + with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch| + release_exclusive = Concurrent::CountDownLatch.new + + waiting_exclusive = Thread.new do + @lock.sharing do + @lock.exclusive do + release_exclusive.wait + end + end + end + assert_threads_stuck waiting_exclusive + + late_share_attempt = Thread.new do + @lock.sharing {} + end + assert_threads_stuck late_share_attempt + + sharing_thread_release_latch.count_down + assert_threads_stuck late_share_attempt + + release_exclusive.count_down + assert_threads_not_stuck late_share_attempt + end + end + def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads scratch_pad = [] scratch_pad_mutex = Mutex.new -- cgit v1.2.3 From f02bd2a92c67f0d4190853521d3580766e829044 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 23:58:36 +1030 Subject: While new sharers are blocked, an existing sharer remains re-entrant --- .../lib/active_support/concurrency/share_lock.rb | 2 +- activesupport/test/share_lock_test.rb | 29 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/activesupport/lib/active_support/concurrency/share_lock.rb b/activesupport/lib/active_support/concurrency/share_lock.rb index 1537f2898f..fa5d9bfdd7 100644 --- a/activesupport/lib/active_support/concurrency/share_lock.rb +++ b/activesupport/lib/active_support/concurrency/share_lock.rb @@ -85,7 +85,7 @@ module ActiveSupport def start_sharing(purpose: :share) synchronize do - if busy_for_sharing?(purpose) + if @sharing[Thread.current] == 0 && @exclusive_thread != Thread.current && busy_for_sharing?(purpose) @cv.wait_while { busy_for_sharing?(purpose) } end @sharing[Thread.current] += 1 diff --git a/activesupport/test/share_lock_test.rb b/activesupport/test/share_lock_test.rb index 0a5b074bee..3475ee94cd 100644 --- a/activesupport/test/share_lock_test.rb +++ b/activesupport/test/share_lock_test.rb @@ -241,6 +241,35 @@ class ShareLockTest < ActiveSupport::TestCase end end + def test_share_remains_reentrant_ignoring_a_waiting_exclusive + with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch| + ready = Concurrent::CyclicBarrier.new(2) + attempt_reentrancy = Concurrent::CountDownLatch.new + + sharer = Thread.new do + @lock.sharing do + ready.wait + attempt_reentrancy.wait + @lock.sharing {} + end + end + + exclusive = Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive {} + end + end + + assert_threads_stuck exclusive + + attempt_reentrancy.count_down + + assert_threads_not_stuck sharer + assert_threads_stuck exclusive + end + end + def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads scratch_pad = [] scratch_pad_mutex = Mutex.new -- cgit v1.2.3 From f836630f8cdf53a06259cc22ac842bbfa6376f65 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Tue, 2 Feb 2016 01:42:20 +1030 Subject: After completing a load, give other threads a chance too While we know no user code is running, we should do as much loading as we can. That way, all the threads will then be able to resume running user code together. Otherwise, only the last arriving thread would get to do its load, and would then return to userspace, leaving the others still blocked. --- .../lib/active_support/concurrency/share_lock.rb | 38 +++++++++++++++------- .../lib/active_support/dependencies/interlock.rb | 6 ++-- activesupport/test/share_lock_test.rb | 17 ++++++++++ 3 files changed, 46 insertions(+), 15 deletions(-) diff --git a/activesupport/lib/active_support/concurrency/share_lock.rb b/activesupport/lib/active_support/concurrency/share_lock.rb index fa5d9bfdd7..8e4ca272ba 100644 --- a/activesupport/lib/active_support/concurrency/share_lock.rb +++ b/activesupport/lib/active_support/concurrency/share_lock.rb @@ -51,14 +51,8 @@ module ActiveSupport if busy_for_exclusive?(purpose) return false if no_wait - loose_shares = @sharing.delete(Thread.current) - @waiting[Thread.current] = compatible if loose_shares - - begin + yield_shares(purpose, compatible) do @cv.wait_while { busy_for_exclusive?(purpose) } - ensure - @waiting.delete Thread.current - @sharing[Thread.current] = loose_shares if loose_shares end end @exclusive_thread = Thread.current @@ -71,14 +65,18 @@ module ActiveSupport # Relinquish the exclusive lock. Must only be called by the thread # that called start_exclusive (and currently holds the lock). - def stop_exclusive + def stop_exclusive(compatible: []) synchronize do raise "invalid unlock" if @exclusive_thread != Thread.current @exclusive_depth -= 1 if @exclusive_depth == 0 @exclusive_thread = nil - @cv.broadcast + + yield_shares(nil, compatible) do + @cv.broadcast + @cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) } + end end end end @@ -109,12 +107,12 @@ module ActiveSupport # the block. # # See +start_exclusive+ for other options. - def exclusive(purpose: nil, compatible: [], no_wait: false) + def exclusive(purpose: nil, compatible: [], after_compatible: [], no_wait: false) if start_exclusive(purpose: purpose, compatible: compatible, no_wait: no_wait) begin yield ensure - stop_exclusive + stop_exclusive(compatible: after_compatible) end end end @@ -139,7 +137,23 @@ module ActiveSupport def busy_for_sharing?(purpose) (@exclusive_thread && @exclusive_thread != Thread.current) || - @waiting.any? { |k, v| k != Thread.current && !v.include?(purpose) } + @waiting.any? { |t, (_, c)| t != Thread.current && !c.include?(purpose) } + end + + def eligible_waiters?(compatible) + @waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } } + end + + def yield_shares(purpose, compatible) + loose_shares = @sharing.delete(Thread.current) + @waiting[Thread.current] = [purpose, compatible] if loose_shares + + begin + yield + ensure + @waiting.delete Thread.current + @sharing[Thread.current] = loose_shares if loose_shares + end end end end diff --git a/activesupport/lib/active_support/dependencies/interlock.rb b/activesupport/lib/active_support/dependencies/interlock.rb index fbeb904684..b6a1b25eee 100644 --- a/activesupport/lib/active_support/dependencies/interlock.rb +++ b/activesupport/lib/active_support/dependencies/interlock.rb @@ -8,13 +8,13 @@ module ActiveSupport #:nodoc: end def loading - @lock.exclusive(purpose: :load, compatible: [:load]) do + @lock.exclusive(purpose: :load, compatible: [:load], after_compatible: [:load]) do yield end end def unloading - @lock.exclusive(purpose: :unload, compatible: [:load, :unload]) do + @lock.exclusive(purpose: :unload, compatible: [:load, :unload], after_compatible: [:load, :unload]) do yield end end @@ -24,7 +24,7 @@ module ActiveSupport #:nodoc: # concurrent activity, return immediately (without executing the # block) instead of waiting. def attempt_unloading - @lock.exclusive(purpose: :unload, compatible: [:load, :unload], no_wait: true) do + @lock.exclusive(purpose: :unload, compatible: [:load, :unload], after_compatible: [:load, :unload], no_wait: true) do yield end end diff --git a/activesupport/test/share_lock_test.rb b/activesupport/test/share_lock_test.rb index 3475ee94cd..12953d99a6 100644 --- a/activesupport/test/share_lock_test.rb +++ b/activesupport/test/share_lock_test.rb @@ -270,6 +270,23 @@ class ShareLockTest < ActiveSupport::TestCase end end + def test_compatible_exclusives_cooperate_to_both_proceed + ready = Concurrent::CyclicBarrier.new(2) + done = Concurrent::CyclicBarrier.new(2) + + threads = 2.times.map do + Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x, compatible: [:x], after_compatible: [:x]) {} + done.wait + end + end + end + + assert_threads_not_stuck threads + end + def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads scratch_pad = [] scratch_pad_mutex = Mutex.new -- cgit v1.2.3