aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/notifications/fanout.rb
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/lib/active_support/notifications/fanout.rb')
-rw-r--r--activesupport/lib/active_support/notifications/fanout.rb24
1 files changed, 17 insertions, 7 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
index 3a2ccf1489..6ffc091233 100644
--- a/activesupport/lib/active_support/notifications/fanout.rb
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -1,26 +1,34 @@
+require 'mutex_m'
+
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
- # Only one of these objects should instantiated per thread. Concurrent
- # access to this class is not allowed.
+ # This class is thread safe. All methods are reentrant.
class Fanout
+ include Mutex_m
+
def initialize
@subscribers = []
@listeners_for = {}
+ super
end
def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block
- @subscribers << subscriber
- @listeners_for.clear
+ synchronize do
+ @subscribers << subscriber
+ @listeners_for.clear
+ end
subscriber
end
def unsubscribe(subscriber)
- @subscribers.reject! { |s| s.matches?(subscriber) }
- @listeners_for.clear
+ synchronize do
+ @subscribers.reject! { |s| s.matches?(subscriber) }
+ @listeners_for.clear
+ end
end
def start(name, id, payload)
@@ -36,7 +44,9 @@ module ActiveSupport
end
def listeners_for(name)
- @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
+ synchronize do
+ @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
+ end
end
def listening?(name)