From 3e4a69e52d8c8f0335e0ddbb46fe21009b962334 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Sun, 7 Feb 2016 08:24:57 +1030 Subject: Hand off the interlock to the new thread in AC::Live Most importantly, the original request thread must yield its share lock while waiting for the live thread to commit -- otherwise a request's base and live threads can deadlock against each other. --- actionpack/lib/action_controller/metal/live.rb | 51 ++++++++++++---------- .../lib/active_support/concurrency/share_lock.rb | 48 +++++++++++++------- .../lib/active_support/dependencies/interlock.rb | 6 +++ activesupport/test/share_lock_test.rb | 26 +++++++++++ 4 files changed, 93 insertions(+), 38 deletions(-) diff --git a/actionpack/lib/action_controller/metal/live.rb b/actionpack/lib/action_controller/metal/live.rb index acc4507b2d..fc20e7a421 100644 --- a/actionpack/lib/action_controller/metal/live.rb +++ b/actionpack/lib/action_controller/metal/live.rb @@ -238,34 +238,39 @@ module ActionController # response code and headers back up the rack stack, and still process # the body in parallel with sending data to the client new_controller_thread { - t2 = Thread.current - - # Since we're processing the view in a different thread, copy the - # thread locals from the main thread to the child thread. :'( - locals.each { |k,v| t2[k] = v } - - begin - super(name) - rescue => e - if @_response.committed? - begin - @_response.stream.write(ActionView::Base.streaming_completion_on_exception) if request.format == :html - @_response.stream.call_on_error - rescue => exception - log_error(exception) - ensure - log_error(e) - @_response.stream.close + ActiveSupport::Dependencies.interlock.running do + t2 = Thread.current + + # Since we're processing the view in a different thread, copy the + # thread locals from the main thread to the child thread. :'( + locals.each { |k,v| t2[k] = v } + + begin + super(name) + rescue => e + if @_response.committed? + begin + @_response.stream.write(ActionView::Base.streaming_completion_on_exception) if request.format == :html + @_response.stream.call_on_error + rescue => exception + log_error(exception) + ensure + log_error(e) + @_response.stream.close + end + else + error = e end - else - error = e + ensure + @_response.commit! end - ensure - @_response.commit! end } - @_response.await_commit + ActiveSupport::Dependencies.interlock.permit_concurrent_loads do + @_response.await_commit + end + raise error if error end diff --git a/activesupport/lib/active_support/concurrency/share_lock.rb b/activesupport/lib/active_support/concurrency/share_lock.rb index 8e4ca272ba..2ac278a2f1 100644 --- a/activesupport/lib/active_support/concurrency/share_lock.rb +++ b/activesupport/lib/active_support/concurrency/share_lock.rb @@ -51,7 +51,7 @@ module ActiveSupport if busy_for_exclusive?(purpose) return false if no_wait - yield_shares(purpose, compatible) do + yield_shares(purpose: purpose, compatible: compatible) do @cv.wait_while { busy_for_exclusive?(purpose) } end end @@ -73,10 +73,10 @@ module ActiveSupport if @exclusive_depth == 0 @exclusive_thread = nil - yield_shares(nil, compatible) do - @cv.broadcast + yield_shares(compatible: compatible) do @cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) } end + @cv.broadcast end end end @@ -127,6 +127,36 @@ module ActiveSupport end end + def yield_shares(purpose: nil, compatible: []) + loose_shares = previous_wait = nil + synchronize do + if loose_shares = @sharing.delete(Thread.current) + if previous_wait = @waiting[Thread.current] + purpose = nil unless purpose == previous_wait[0] + compatible &= previous_wait[1] + end + @waiting[Thread.current] = [purpose, compatible] + end + + @cv.broadcast + end + + begin + yield + ensure + synchronize do + @cv.wait_while { @exclusive_thread && @exclusive_thread != Thread.current } + + if previous_wait + @waiting[Thread.current] = previous_wait + else + @waiting.delete Thread.current + end + @sharing[Thread.current] = loose_shares if loose_shares + end + end + end + private # Must be called within synchronize @@ -143,18 +173,6 @@ module ActiveSupport def eligible_waiters?(compatible) @waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } } end - - def yield_shares(purpose, compatible) - loose_shares = @sharing.delete(Thread.current) - @waiting[Thread.current] = [purpose, compatible] if loose_shares - - begin - yield - ensure - @waiting.delete Thread.current - @sharing[Thread.current] = loose_shares if loose_shares - end - end end end end diff --git a/activesupport/lib/active_support/dependencies/interlock.rb b/activesupport/lib/active_support/dependencies/interlock.rb index b6a1b25eee..47bcecbb35 100644 --- a/activesupport/lib/active_support/dependencies/interlock.rb +++ b/activesupport/lib/active_support/dependencies/interlock.rb @@ -42,6 +42,12 @@ module ActiveSupport #:nodoc: yield end end + + def permit_concurrent_loads + @lock.yield_shares(compatible: [:load]) do + yield + end + end end end end diff --git a/activesupport/test/share_lock_test.rb b/activesupport/test/share_lock_test.rb index 12953d99a6..68fa5bb69e 100644 --- a/activesupport/test/share_lock_test.rb +++ b/activesupport/test/share_lock_test.rb @@ -287,6 +287,32 @@ class ShareLockTest < ActiveSupport::TestCase assert_threads_not_stuck threads end + def test_manual_yield + ready = Concurrent::CyclicBarrier.new(2) + done = Concurrent::CyclicBarrier.new(2) + + threads = [ + Thread.new do + @lock.sharing do + ready.wait + @lock.exclusive(purpose: :x) {} + done.wait + end + end, + + Thread.new do + @lock.sharing do + ready.wait + @lock.yield_shares(compatible: [:x]) do + done.wait + end + end + end, + ] + + assert_threads_not_stuck threads + end + def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads scratch_pad = [] scratch_pad_mutex = Mutex.new -- cgit v1.2.3