diff options
Diffstat (limited to 'activesupport/lib/active_support/notifications/fanout.rb')
-rw-r--r-- | activesupport/lib/active_support/notifications/fanout.rb | 37 |
1 files changed, 24 insertions, 13 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb index 17c99089c1..2e5bcf4639 100644 --- a/activesupport/lib/active_support/notifications/fanout.rb +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -1,23 +1,34 @@ +require 'mutex_m' + module ActiveSupport module Notifications # This is a default queue implementation that ships with Notifications. # It just pushes events to all registered log subscribers. + # + # This class is thread safe. All methods are reentrant. class Fanout + include Mutex_m + def initialize @subscribers = [] @listeners_for = {} + super end def subscribe(pattern = nil, block = Proc.new) subscriber = Subscribers.new pattern, block - @subscribers << subscriber - @listeners_for.clear + synchronize do + @subscribers << subscriber + @listeners_for.clear + end subscriber end def unsubscribe(subscriber) - @subscribers.reject! { |s| s.matches?(subscriber) } - @listeners_for.clear + synchronize do + @subscribers.reject! { |s| s.matches?(subscriber) } + @listeners_for.clear + end end def start(name, id, payload) @@ -33,7 +44,9 @@ module ActiveSupport end def listeners_for(name) - @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } + synchronize do + @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } + end end def listening?(name) @@ -46,10 +59,10 @@ module ActiveSupport module Subscribers # :nodoc: def self.new(pattern, listener) - if listener.respond_to?(:call) - subscriber = Timed.new pattern, listener - else + if listener.respond_to?(:start) and listener.respond_to?(:finish) subscriber = Evented.new pattern, listener + else + subscriber = Timed.new pattern, listener end unless pattern @@ -85,9 +98,7 @@ module ActiveSupport class Timed < Evented def initialize(pattern, delegate) - @timestack = Hash.new { |h,id| - h[id] = Hash.new { |ids,name| ids[name] = [] } - } + @timestack = [] super end @@ -96,11 +107,11 @@ module ActiveSupport end def start(name, id, payload) - @timestack[id][name].push Time.now + @timestack.push Time.now end def finish(name, id, payload) - started = @timestack[id][name].pop + started = @timestack.pop @delegate.call(name, started, Time.now, id, payload) end end |