aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/notifications/fanout.rb
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/lib/active_support/notifications/fanout.rb')
-rw-r--r--activesupport/lib/active_support/notifications/fanout.rb118
1 files changed, 101 insertions, 17 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
index a9aa5464e9..2e5bcf4639 100644
--- a/activesupport/lib/active_support/notifications/fanout.rb
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -1,24 +1,42 @@
+require 'mutex_m'
+
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
+ #
+ # This class is thread safe. All methods are reentrant.
class Fanout
+ include Mutex_m
+
def initialize
@subscribers = []
@listeners_for = {}
+ super
end
def subscribe(pattern = nil, block = Proc.new)
- subscriber = Subscriber.new(pattern, block).tap do |s|
- @subscribers << s
+ subscriber = Subscribers.new pattern, block
+ synchronize do
+ @subscribers << subscriber
+ @listeners_for.clear
end
- @listeners_for.clear
subscriber
end
def unsubscribe(subscriber)
- @subscribers.reject! {|s| s.matches?(subscriber)}
- @listeners_for.clear
+ synchronize do
+ @subscribers.reject! { |s| s.matches?(subscriber) }
+ @listeners_for.clear
+ end
+ end
+
+ def start(name, id, payload)
+ listeners_for(name).each { |s| s.start(name, id, payload) }
+ end
+
+ def finish(name, id, payload)
+ listeners_for(name).each { |s| s.finish(name, id, payload) }
end
def publish(name, *args)
@@ -26,7 +44,9 @@ module ActiveSupport
end
def listeners_for(name)
- @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
+ synchronize do
+ @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
+ end
end
def listening?(name)
@@ -37,23 +57,87 @@ module ActiveSupport
def wait
end
- class Subscriber #:nodoc:
- def initialize(pattern, delegate)
- @pattern = pattern
- @delegate = delegate
+ module Subscribers # :nodoc:
+ def self.new(pattern, listener)
+ if listener.respond_to?(:start) and listener.respond_to?(:finish)
+ subscriber = Evented.new pattern, listener
+ else
+ subscriber = Timed.new pattern, listener
+ end
+
+ unless pattern
+ AllMessages.new(subscriber)
+ else
+ subscriber
+ end
end
- def publish(message, *args)
- @delegate.call(message, *args)
+ class Evented #:nodoc:
+ def initialize(pattern, delegate)
+ @pattern = pattern
+ @delegate = delegate
+ end
+
+ def start(name, id, payload)
+ @delegate.start name, id, payload
+ end
+
+ def finish(name, id, payload)
+ @delegate.finish name, id, payload
+ end
+
+ def subscribed_to?(name)
+ @pattern === name.to_s
+ end
+
+ def matches?(subscriber_or_name)
+ self === subscriber_or_name ||
+ @pattern && @pattern === subscriber_or_name
+ end
end
- def subscribed_to?(name)
- !@pattern || @pattern === name.to_s
+ class Timed < Evented
+ def initialize(pattern, delegate)
+ @timestack = []
+ super
+ end
+
+ def publish(name, *args)
+ @delegate.call name, *args
+ end
+
+ def start(name, id, payload)
+ @timestack.push Time.now
+ end
+
+ def finish(name, id, payload)
+ started = @timestack.pop
+ @delegate.call(name, started, Time.now, id, payload)
+ end
end
- def matches?(subscriber_or_name)
- self === subscriber_or_name ||
- @pattern && @pattern === subscriber_or_name
+ class AllMessages # :nodoc:
+ def initialize(delegate)
+ @delegate = delegate
+ end
+
+ def start(name, id, payload)
+ @delegate.start name, id, payload
+ end
+
+ def finish(name, id, payload)
+ @delegate.finish name, id, payload
+ end
+
+ def publish(name, *args)
+ @delegate.publish name, *args
+ end
+
+ def subscribed_to?(name)
+ true
+ end
+
+ alias :matches? :===
end
end
end