diff options
Diffstat (limited to 'activesupport/lib/active_support/concurrency')
-rw-r--r-- | activesupport/lib/active_support/concurrency/latch.rb | 17 | ||||
-rw-r--r-- | activesupport/lib/active_support/concurrency/share_lock.rb | 54 |
2 files changed, 59 insertions, 12 deletions
diff --git a/activesupport/lib/active_support/concurrency/latch.rb b/activesupport/lib/active_support/concurrency/latch.rb index 4abe5ece6f..35074265b8 100644 --- a/activesupport/lib/active_support/concurrency/latch.rb +++ b/activesupport/lib/active_support/concurrency/latch.rb @@ -2,17 +2,24 @@ require 'concurrent/atomic/count_down_latch' module ActiveSupport module Concurrency - class Latch < Concurrent::CountDownLatch + class Latch def initialize(count = 1) - ActiveSupport::Deprecation.warn("ActiveSupport::Concurrency::Latch is deprecated. Please use Concurrent::CountDownLatch instead.") - super(count) + if count == 1 + ActiveSupport::Deprecation.warn("ActiveSupport::Concurrency::Latch is deprecated. Please use Concurrent::Event instead.") + else + ActiveSupport::Deprecation.warn("ActiveSupport::Concurrency::Latch is deprecated. Please use Concurrent::CountDownLatch instead.") + end + + @inner = Concurrent::CountDownLatch.new(count) end - alias_method :release, :count_down + def release + @inner.count_down + end def await - wait(nil) + @inner.wait(nil) end end end diff --git a/activesupport/lib/active_support/concurrency/share_lock.rb b/activesupport/lib/active_support/concurrency/share_lock.rb index 54244317e4..cf632ea7b0 100644 --- a/activesupport/lib/active_support/concurrency/share_lock.rb +++ b/activesupport/lib/active_support/concurrency/share_lock.rb @@ -14,6 +14,38 @@ module ActiveSupport # to upgrade share locks to exclusive. + def raw_state # :nodoc: + synchronize do + threads = @sleeping.keys | @sharing.keys | @waiting.keys + threads |= [@exclusive_thread] if @exclusive_thread + + data = {} + + threads.each do |thread| + purpose, compatible = @waiting[thread] + + data[thread] = { + thread: thread, + sharing: @sharing[thread], + exclusive: @exclusive_thread == thread, + purpose: purpose, + compatible: compatible, + waiting: !!@waiting[thread], + sleeper: @sleeping[thread], + } + end + + # NB: Yields while holding our *internal* synchronize lock, + # which is supposed to be used only for a few instructions at + # a time. This allows the caller to inspect additional state + # without things changing out from underneath, but would have + # disastrous effects upon normal operation. Fortunately, this + # method is only intended to be called when things have + # already gone wrong. + yield data + end + end + def initialize super() @@ -21,6 +53,7 @@ module ActiveSupport @sharing = Hash.new(0) @waiting = {} + @sleeping = {} @exclusive_thread = nil @exclusive_depth = 0 end @@ -46,7 +79,7 @@ module ActiveSupport return false if no_wait yield_shares(purpose: purpose, compatible: compatible, block_share: true) do - @cv.wait_while { busy_for_exclusive?(purpose) } + wait_for(:start_exclusive) { busy_for_exclusive?(purpose) } end end @exclusive_thread = Thread.current @@ -69,7 +102,7 @@ module ActiveSupport if eligible_waiters?(compatible) yield_shares(compatible: compatible, block_share: true) do - @cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) } + wait_for(:stop_exclusive) { @exclusive_thread || eligible_waiters?(compatible) } end end @cv.broadcast @@ -84,11 +117,11 @@ module ActiveSupport elsif @waiting[Thread.current] # We're nested inside a +yield_shares+ call: we'll resume as # soon as there isn't an exclusive lock in our way - @cv.wait_while { @exclusive_thread } + wait_for(:start_sharing) { @exclusive_thread } else # This is an initial / outermost share call: any outstanding # requests for an exclusive lock get to go first - @cv.wait_while { busy_for_sharing?(false) } + wait_for(:start_sharing) { busy_for_sharing?(false) } end @sharing[Thread.current] += 1 end @@ -144,16 +177,16 @@ module ActiveSupport end compatible |= [false] unless block_share @waiting[Thread.current] = [purpose, compatible] - - @cv.broadcast end + + @cv.broadcast end begin yield ensure synchronize do - @cv.wait_while { @exclusive_thread && @exclusive_thread != Thread.current } + wait_for(:yield_shares) { @exclusive_thread && @exclusive_thread != Thread.current } if previous_wait @waiting[Thread.current] = previous_wait @@ -181,6 +214,13 @@ module ActiveSupport def eligible_waiters?(compatible) @waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } } end + + def wait_for(method) + @sleeping[Thread.current] = method + @cv.wait_while { yield } + ensure + @sleeping.delete Thread.current + end end end end |