diff options
Diffstat (limited to 'activesupport/lib/active_support/notifications/fanout.rb')
-rw-r--r-- | activesupport/lib/active_support/notifications/fanout.rb | 24 |
1 files changed, 17 insertions, 7 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb index 3a2ccf1489..6ffc091233 100644 --- a/activesupport/lib/active_support/notifications/fanout.rb +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -1,26 +1,34 @@ +require 'mutex_m' + module ActiveSupport module Notifications # This is a default queue implementation that ships with Notifications. # It just pushes events to all registered log subscribers. # - # Only one of these objects should instantiated per thread. Concurrent - # access to this class is not allowed. + # This class is thread safe. All methods are reentrant. class Fanout + include Mutex_m + def initialize @subscribers = [] @listeners_for = {} + super end def subscribe(pattern = nil, block = Proc.new) subscriber = Subscribers.new pattern, block - @subscribers << subscriber - @listeners_for.clear + synchronize do + @subscribers << subscriber + @listeners_for.clear + end subscriber end def unsubscribe(subscriber) - @subscribers.reject! { |s| s.matches?(subscriber) } - @listeners_for.clear + synchronize do + @subscribers.reject! { |s| s.matches?(subscriber) } + @listeners_for.clear + end end def start(name, id, payload) @@ -36,7 +44,9 @@ module ActiveSupport end def listeners_for(name) - @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } + synchronize do + @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } + end end def listening?(name) |