aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-02-08 06:42:49 +1030
committerMatthew Draper <matthew@trebex.net>2016-02-08 06:42:49 +1030
commit15f6ad3adca89d07c294fe78e2d383055634f6ca (patch)
treee7245fb5ab53ef16a23ae21ce84c79da983375a3 /activesupport
parent7e35cb2987e146de29d50f97a43caef61e7588af (diff)
parent11579b8306dd6303b306ee10c8bc9f7a4a6adea3 (diff)
downloadrails-15f6ad3adca89d07c294fe78e2d383055634f6ca.tar.gz
rails-15f6ad3adca89d07c294fe78e2d383055634f6ca.tar.bz2
rails-15f6ad3adca89d07c294fe78e2d383055634f6ca.zip
Merge pull request #23532 from matthewd/live-interlock
Hand off the interlock to the new thread in AC::Live
Diffstat (limited to 'activesupport')
-rw-r--r--activesupport/lib/active_support/concurrency/share_lock.rb76
-rw-r--r--activesupport/lib/active_support/dependencies/interlock.rb6
-rw-r--r--activesupport/test/share_lock_test.rb167
3 files changed, 224 insertions, 25 deletions
diff --git a/activesupport/lib/active_support/concurrency/share_lock.rb b/activesupport/lib/active_support/concurrency/share_lock.rb
index 8e4ca272ba..54244317e4 100644
--- a/activesupport/lib/active_support/concurrency/share_lock.rb
+++ b/activesupport/lib/active_support/concurrency/share_lock.rb
@@ -6,12 +6,6 @@ module ActiveSupport
# A share/exclusive lock, otherwise known as a read/write lock.
#
# https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock
- #--
- # Note that a pending Exclusive lock attempt does not block incoming
- # Share requests (i.e., we are "read-preferring"). That seems
- # consistent with the behavior of "loose" upgrades, but may be the
- # wrong choice otherwise: it nominally reduces the possibility of
- # deadlock by risking starvation instead.
class ShareLock
include MonitorMixin
@@ -51,7 +45,7 @@ module ActiveSupport
if busy_for_exclusive?(purpose)
return false if no_wait
- yield_shares(purpose, compatible) do
+ yield_shares(purpose: purpose, compatible: compatible, block_share: true) do
@cv.wait_while { busy_for_exclusive?(purpose) }
end
end
@@ -73,18 +67,28 @@ module ActiveSupport
if @exclusive_depth == 0
@exclusive_thread = nil
- yield_shares(nil, compatible) do
- @cv.broadcast
- @cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) }
+ if eligible_waiters?(compatible)
+ yield_shares(compatible: compatible, block_share: true) do
+ @cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) }
+ end
end
+ @cv.broadcast
end
end
end
- def start_sharing(purpose: :share)
+ def start_sharing
synchronize do
- if @sharing[Thread.current] == 0 && @exclusive_thread != Thread.current && busy_for_sharing?(purpose)
- @cv.wait_while { busy_for_sharing?(purpose) }
+ if @sharing[Thread.current] > 0 || @exclusive_thread == Thread.current
+ # We already hold a lock; nothing to wait for
+ elsif @waiting[Thread.current]
+ # We're nested inside a +yield_shares+ call: we'll resume as
+ # soon as there isn't an exclusive lock in our way
+ @cv.wait_while { @exclusive_thread }
+ else
+ # This is an initial / outermost share call: any outstanding
+ # requests for an exclusive lock get to go first
+ @cv.wait_while { busy_for_sharing?(false) }
end
@sharing[Thread.current] += 1
end
@@ -127,6 +131,40 @@ module ActiveSupport
end
end
+ # Temporarily give up all held Share locks while executing the
+ # supplied block, allowing any +compatible+ exclusive lock request
+ # to proceed.
+ def yield_shares(purpose: nil, compatible: [], block_share: false)
+ loose_shares = previous_wait = nil
+ synchronize do
+ if loose_shares = @sharing.delete(Thread.current)
+ if previous_wait = @waiting[Thread.current]
+ purpose = nil unless purpose == previous_wait[0]
+ compatible &= previous_wait[1]
+ end
+ compatible |= [false] unless block_share
+ @waiting[Thread.current] = [purpose, compatible]
+
+ @cv.broadcast
+ end
+ end
+
+ begin
+ yield
+ ensure
+ synchronize do
+ @cv.wait_while { @exclusive_thread && @exclusive_thread != Thread.current }
+
+ if previous_wait
+ @waiting[Thread.current] = previous_wait
+ else
+ @waiting.delete Thread.current
+ end
+ @sharing[Thread.current] = loose_shares if loose_shares
+ end
+ end
+ end
+
private
# Must be called within synchronize
@@ -143,18 +181,6 @@ module ActiveSupport
def eligible_waiters?(compatible)
@waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } }
end
-
- def yield_shares(purpose, compatible)
- loose_shares = @sharing.delete(Thread.current)
- @waiting[Thread.current] = [purpose, compatible] if loose_shares
-
- begin
- yield
- ensure
- @waiting.delete Thread.current
- @sharing[Thread.current] = loose_shares if loose_shares
- end
- end
end
end
end
diff --git a/activesupport/lib/active_support/dependencies/interlock.rb b/activesupport/lib/active_support/dependencies/interlock.rb
index b6a1b25eee..47bcecbb35 100644
--- a/activesupport/lib/active_support/dependencies/interlock.rb
+++ b/activesupport/lib/active_support/dependencies/interlock.rb
@@ -42,6 +42,12 @@ module ActiveSupport #:nodoc:
yield
end
end
+
+ def permit_concurrent_loads
+ @lock.yield_shares(compatible: [:load]) do
+ yield
+ end
+ end
end
end
end
diff --git a/activesupport/test/share_lock_test.rb b/activesupport/test/share_lock_test.rb
index 12953d99a6..acefa185a8 100644
--- a/activesupport/test/share_lock_test.rb
+++ b/activesupport/test/share_lock_test.rb
@@ -287,6 +287,173 @@ class ShareLockTest < ActiveSupport::TestCase
assert_threads_not_stuck threads
end
+ def test_manual_yield
+ ready = Concurrent::CyclicBarrier.new(2)
+ done = Concurrent::CyclicBarrier.new(2)
+
+ threads = [
+ Thread.new do
+ @lock.sharing do
+ ready.wait
+ @lock.exclusive(purpose: :x) {}
+ done.wait
+ end
+ end,
+
+ Thread.new do
+ @lock.sharing do
+ ready.wait
+ @lock.yield_shares(compatible: [:x]) do
+ done.wait
+ end
+ end
+ end,
+ ]
+
+ assert_threads_not_stuck threads
+ end
+
+ def test_manual_incompatible_yield
+ ready = Concurrent::CyclicBarrier.new(2)
+ done = Concurrent::CyclicBarrier.new(2)
+
+ threads = [
+ Thread.new do
+ @lock.sharing do
+ ready.wait
+ @lock.exclusive(purpose: :x) {}
+ done.wait
+ end
+ end,
+
+ Thread.new do
+ @lock.sharing do
+ ready.wait
+ @lock.yield_shares(compatible: [:y]) do
+ done.wait
+ end
+ end
+ end,
+ ]
+
+ assert_threads_stuck threads
+ ensure
+ threads.each(&:kill) if threads
+ end
+
+ def test_manual_recursive_yield
+ ready = Concurrent::CyclicBarrier.new(2)
+ done = Concurrent::CyclicBarrier.new(2)
+ do_nesting = Concurrent::CountDownLatch.new
+
+ threads = [
+ Thread.new do
+ @lock.sharing do
+ ready.wait
+ @lock.exclusive(purpose: :x) {}
+ done.wait
+ end
+ end,
+
+ Thread.new do
+ @lock.sharing do
+ @lock.yield_shares(compatible: [:x]) do
+ @lock.sharing do
+ ready.wait
+ do_nesting.wait
+ @lock.yield_shares(compatible: [:x, :y]) do
+ done.wait
+ end
+ end
+ end
+ end
+ end
+ ]
+
+ assert_threads_stuck threads
+ do_nesting.count_down
+
+ assert_threads_not_stuck threads
+ end
+
+ def test_manual_recursive_yield_cannot_expand_outer_compatible
+ ready = Concurrent::CyclicBarrier.new(2)
+ do_compatible_nesting = Concurrent::CountDownLatch.new
+ in_compatible_nesting = Concurrent::CountDownLatch.new
+
+ incompatible_thread = Thread.new do
+ @lock.sharing do
+ ready.wait
+ @lock.exclusive(purpose: :x) {}
+ end
+ end
+
+ yield_shares_thread = Thread.new do
+ @lock.sharing do
+ ready.wait
+ @lock.yield_shares(compatible: [:y]) do
+ do_compatible_nesting.wait
+ @lock.sharing do
+ @lock.yield_shares(compatible: [:x, :y]) do
+ in_compatible_nesting.wait
+ end
+ end
+ end
+ end
+ end
+
+ assert_threads_stuck incompatible_thread
+ do_compatible_nesting.count_down
+ assert_threads_stuck incompatible_thread
+ in_compatible_nesting.count_down
+ assert_threads_not_stuck [yield_shares_thread, incompatible_thread]
+ end
+
+ def test_manual_recursive_yield_restores_previous_compatible
+ ready = Concurrent::CyclicBarrier.new(2)
+ do_nesting = Concurrent::CountDownLatch.new
+ after_nesting = Concurrent::CountDownLatch.new
+
+ incompatible_thread = Thread.new do
+ ready.wait
+ @lock.exclusive(purpose: :z) {}
+ end
+
+ recursive_yield_shares_thread = Thread.new do
+ @lock.sharing do
+ ready.wait
+ @lock.yield_shares(compatible: [:y]) do
+ do_nesting.wait
+ @lock.sharing do
+ @lock.yield_shares(compatible: [:x, :y]) {}
+ end
+ after_nesting.wait
+ end
+ end
+ end
+
+ assert_threads_stuck incompatible_thread
+ do_nesting.count_down
+ assert_threads_stuck incompatible_thread
+
+ compatible_thread = Thread.new do
+ @lock.exclusive(purpose: :y) {}
+ end
+ assert_threads_not_stuck compatible_thread
+
+ post_nesting_incompatible_thread = Thread.new do
+ @lock.exclusive(purpose: :x) {}
+ end
+ assert_threads_stuck post_nesting_incompatible_thread
+
+ after_nesting.count_down
+ assert_threads_not_stuck recursive_yield_shares_thread
+ # post_nesting_incompatible_thread can now proceed
+ assert_threads_not_stuck post_nesting_incompatible_thread
+ # assert_threads_not_stuck can now proceed
+ assert_threads_not_stuck incompatible_thread
+ end
+
def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads
scratch_pad = []
scratch_pad_mutex = Mutex.new