aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--actionpack/lib/action_controller/metal/live.rb51
-rw-r--r--activesupport/lib/active_support/concurrency/share_lock.rb48
-rw-r--r--activesupport/lib/active_support/dependencies/interlock.rb6
-rw-r--r--activesupport/test/share_lock_test.rb26
4 files changed, 93 insertions, 38 deletions
diff --git a/actionpack/lib/action_controller/metal/live.rb b/actionpack/lib/action_controller/metal/live.rb
index acc4507b2d..fc20e7a421 100644
--- a/actionpack/lib/action_controller/metal/live.rb
+++ b/actionpack/lib/action_controller/metal/live.rb
@@ -238,34 +238,39 @@ module ActionController
# response code and headers back up the rack stack, and still process
# the body in parallel with sending data to the client
new_controller_thread {
- t2 = Thread.current
-
- # Since we're processing the view in a different thread, copy the
- # thread locals from the main thread to the child thread. :'(
- locals.each { |k,v| t2[k] = v }
-
- begin
- super(name)
- rescue => e
- if @_response.committed?
- begin
- @_response.stream.write(ActionView::Base.streaming_completion_on_exception) if request.format == :html
- @_response.stream.call_on_error
- rescue => exception
- log_error(exception)
- ensure
- log_error(e)
- @_response.stream.close
+ ActiveSupport::Dependencies.interlock.running do
+ t2 = Thread.current
+
+ # Since we're processing the view in a different thread, copy the
+ # thread locals from the main thread to the child thread. :'(
+ locals.each { |k,v| t2[k] = v }
+
+ begin
+ super(name)
+ rescue => e
+ if @_response.committed?
+ begin
+ @_response.stream.write(ActionView::Base.streaming_completion_on_exception) if request.format == :html
+ @_response.stream.call_on_error
+ rescue => exception
+ log_error(exception)
+ ensure
+ log_error(e)
+ @_response.stream.close
+ end
+ else
+ error = e
end
- else
- error = e
+ ensure
+ @_response.commit!
end
- ensure
- @_response.commit!
end
}
- @_response.await_commit
+ ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
+ @_response.await_commit
+ end
+
raise error if error
end
diff --git a/activesupport/lib/active_support/concurrency/share_lock.rb b/activesupport/lib/active_support/concurrency/share_lock.rb
index 8e4ca272ba..2ac278a2f1 100644
--- a/activesupport/lib/active_support/concurrency/share_lock.rb
+++ b/activesupport/lib/active_support/concurrency/share_lock.rb
@@ -51,7 +51,7 @@ module ActiveSupport
if busy_for_exclusive?(purpose)
return false if no_wait
- yield_shares(purpose, compatible) do
+ yield_shares(purpose: purpose, compatible: compatible) do
@cv.wait_while { busy_for_exclusive?(purpose) }
end
end
@@ -73,10 +73,10 @@ module ActiveSupport
if @exclusive_depth == 0
@exclusive_thread = nil
- yield_shares(nil, compatible) do
- @cv.broadcast
+ yield_shares(compatible: compatible) do
@cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) }
end
+ @cv.broadcast
end
end
end
@@ -127,6 +127,36 @@ module ActiveSupport
end
end
+ def yield_shares(purpose: nil, compatible: [])
+ loose_shares = previous_wait = nil
+ synchronize do
+ if loose_shares = @sharing.delete(Thread.current)
+ if previous_wait = @waiting[Thread.current]
+ purpose = nil unless purpose == previous_wait[0]
+ compatible &= previous_wait[1]
+ end
+ @waiting[Thread.current] = [purpose, compatible]
+ end
+
+ @cv.broadcast
+ end
+
+ begin
+ yield
+ ensure
+ synchronize do
+ @cv.wait_while { @exclusive_thread && @exclusive_thread != Thread.current }
+
+ if previous_wait
+ @waiting[Thread.current] = previous_wait
+ else
+ @waiting.delete Thread.current
+ end
+ @sharing[Thread.current] = loose_shares if loose_shares
+ end
+ end
+ end
+
private
# Must be called within synchronize
@@ -143,18 +173,6 @@ module ActiveSupport
def eligible_waiters?(compatible)
@waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } }
end
-
- def yield_shares(purpose, compatible)
- loose_shares = @sharing.delete(Thread.current)
- @waiting[Thread.current] = [purpose, compatible] if loose_shares
-
- begin
- yield
- ensure
- @waiting.delete Thread.current
- @sharing[Thread.current] = loose_shares if loose_shares
- end
- end
end
end
end
diff --git a/activesupport/lib/active_support/dependencies/interlock.rb b/activesupport/lib/active_support/dependencies/interlock.rb
index b6a1b25eee..47bcecbb35 100644
--- a/activesupport/lib/active_support/dependencies/interlock.rb
+++ b/activesupport/lib/active_support/dependencies/interlock.rb
@@ -42,6 +42,12 @@ module ActiveSupport #:nodoc:
yield
end
end
+
+ def permit_concurrent_loads
+ @lock.yield_shares(compatible: [:load]) do
+ yield
+ end
+ end
end
end
end
diff --git a/activesupport/test/share_lock_test.rb b/activesupport/test/share_lock_test.rb
index 12953d99a6..68fa5bb69e 100644
--- a/activesupport/test/share_lock_test.rb
+++ b/activesupport/test/share_lock_test.rb
@@ -287,6 +287,32 @@ class ShareLockTest < ActiveSupport::TestCase
assert_threads_not_stuck threads
end
+ def test_manual_yield
+ ready = Concurrent::CyclicBarrier.new(2)
+ done = Concurrent::CyclicBarrier.new(2)
+
+ threads = [
+ Thread.new do
+ @lock.sharing do
+ ready.wait
+ @lock.exclusive(purpose: :x) {}
+ done.wait
+ end
+ end,
+
+ Thread.new do
+ @lock.sharing do
+ ready.wait
+ @lock.yield_shares(compatible: [:x]) do
+ done.wait
+ end
+ end
+ end,
+ ]
+
+ assert_threads_not_stuck threads
+ end
+
def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads
scratch_pad = []
scratch_pad_mutex = Mutex.new