aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/notifications/fanout.rb
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/lib/active_support/notifications/fanout.rb')
-rw-r--r--activesupport/lib/active_support/notifications/fanout.rb37
1 files changed, 24 insertions, 13 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
index 17c99089c1..2e5bcf4639 100644
--- a/activesupport/lib/active_support/notifications/fanout.rb
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -1,23 +1,34 @@
+require 'mutex_m'
+
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
+ #
+ # This class is thread safe. All methods are reentrant.
class Fanout
+ include Mutex_m
+
def initialize
@subscribers = []
@listeners_for = {}
+ super
end
def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block
- @subscribers << subscriber
- @listeners_for.clear
+ synchronize do
+ @subscribers << subscriber
+ @listeners_for.clear
+ end
subscriber
end
def unsubscribe(subscriber)
- @subscribers.reject! { |s| s.matches?(subscriber) }
- @listeners_for.clear
+ synchronize do
+ @subscribers.reject! { |s| s.matches?(subscriber) }
+ @listeners_for.clear
+ end
end
def start(name, id, payload)
@@ -33,7 +44,9 @@ module ActiveSupport
end
def listeners_for(name)
- @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
+ synchronize do
+ @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
+ end
end
def listening?(name)
@@ -46,10 +59,10 @@ module ActiveSupport
module Subscribers # :nodoc:
def self.new(pattern, listener)
- if listener.respond_to?(:call)
- subscriber = Timed.new pattern, listener
- else
+ if listener.respond_to?(:start) and listener.respond_to?(:finish)
subscriber = Evented.new pattern, listener
+ else
+ subscriber = Timed.new pattern, listener
end
unless pattern
@@ -85,9 +98,7 @@ module ActiveSupport
class Timed < Evented
def initialize(pattern, delegate)
- @timestack = Hash.new { |h,id|
- h[id] = Hash.new { |ids,name| ids[name] = [] }
- }
+ @timestack = []
super
end
@@ -96,11 +107,11 @@ module ActiveSupport
end
def start(name, id, payload)
- @timestack[id][name].push Time.now
+ @timestack.push Time.now
end
def finish(name, id, payload)
- started = @timestack[id][name].pop
+ started = @timestack.pop
@delegate.call(name, started, Time.now, id, payload)
end
end